Flow creation and management

Programmatically building a Flow

The flow, including datasets, recipes, … can be fully managed and created programatically.

Datasets can be created and managed using the methods detailed in Datasets (other operations).

Recipes can be created using the project.new_recipe method. This follows a builder pattern: new_recipe returns you a recipe creator object, on which you add settings, and then call the create() method to actually create the recipe object.

The builder objects reproduce the functionality available in the recipe creation modals in the UI, so for more control on the recipe’s setup, it is often necessary to get its settings after creation, modify it, and save it again.

Creating a Python recipe

builder = project.new_recipe("python")

# Set the input
builder.with_input("myinputdataset")
# Create a new managed dataset for the output in the filesystem_managed connection
builder.with_new_output_dataset("grouped_dataset", "filesystem_managed")

# Set the code - builder is a PythonRecipeCreator, and has a ``with_script`` method
builder.with_script("""
import dataiku
from dataiku import recipe
input_dataset = recipe.get_inputs_as_datasets()[0]
output_dataset = recipe.get_outputs_as_datasets()[0]

df = input_dataset.get_dataframe()
df = df.groupby("something").count()
output_dataset.write_with_schema(df)
""")

recipe = builder.create()

# recipe is now a ``DSSRecipe`` representing the new recipe, and we can now run it
job = recipe.run()

Creating a Sync recipe

builder = project.new_recipe("sync")
builder = builder.with_input("input_dataset_name")
builder = builder.with_new_output("output_dataset_name", "hdfs_managed", format_option_id="PARQUET_HIVE")

recipe = builder.create()
job = recipe.run()

Creating and modifying a grouping recipe

The recipe creation mostly handles setting up the inputs and outputs of the recipes, so most of the setup of the recipe has to be done by retrieving its settings, altering and saving them, then applying schema changes to the output

builder = project.new_recipe("grouping")
builder.with_input("dataset_to_group_on")
# Create a new managed dataset for the output in the "filesystem_managed" connection
builder.with_new_output("grouped_dataset", "filesystem_managed")
builder.with_group_key("column")
recipe = builder.build()

# After the recipe is created, you can edit its settings
recipe_settings = recipe.get_settings()
recipe_settings.set_column_aggregations("myvaluecolumn", sum=True)
recipe_settings.save()

# And you may need to apply new schemas to the outputs
# This will add the myvaluecolumn_sum to the "grouped_dataset" dataset
recipe.compute_schema_updates().apply()

# It should be noted that running a recipe is equivalent to building its output(s)
job = recipe.run()

A complete example

This examples shows a complete chain:

  • Creating an external dataset
  • Automatically detecting the settings of the dataset (see Datasets (other operations) for details)
  • Creating a prepare recipe to clenaup the dataset
  • Then chaining a grouping recipe, setting an aggregation on it
  • Running the entire chain
dataset = project.create_sql_table_dataset("mydataset", "PostgreSQL", "my_sql_connection", "mytable", "myschema")

dataset_settings = dataset.autodetect_settings()
dataset_settings.save()

# As a shortcut, we can call new_recipe on the DSSDataset object. This way, we don't need to call "with_input"

prepare_builder = dataset.new_recipe("prepare")
prepare_builder.with_new_output("mydataset_cleaned", "filesystem_managed")

prepare_recipe = prepare_builder.create()

# Add a step to clean values in "doublecol" that are not valid doubles
prepare_settings = prepare_recipe.get_settings()
prepare_settings.add_filter_on_bad_meaning("DoubleMeaning", "doublecol")
prepare_settings.save()

prepare_recipe.compute_schema_updates().apply()
prepare_recipe().run()

# Grouping recipe

grouping_builder = project.new_recipe("grouping")
grouping_builder.with_input("mydataset_cleaned")
grouping_builder.with_new_output("mydataset_cleaned_grouped", "filesystem_managed")
grouping_builder.with_group_key("column")
grouping_recipe = grouping_builder.build()

grouping_recipe_settings = grouping_recipe.get_settings()
grouping_recipe_settings.set_column_aggregations("myvaluecolumn", sum=True)
grouping_recipe_settings.save()
grouping_recipe_settings.compute_schema_updates().apply()

grouping_recipe_settings.run()

Working with flow zones

Creating a zone and adding items in it

flow = project.get_flow()
zone = flow.create_zone("zone1")

# First way of adding an item to a zone
dataset = project.get_dataset("mydataset")
zone.add_item(dataset)

# Second way of adding an item to a zone
dataset = project.get_dataset("mydataset")
dataset.move_to_zone("zone1")

# Third way of adding an item to a zone
dataset = project.get_dataset("mydataset")
dataset.move_to_zone(zone)

Listing and getting zones

# List zones

for zone in flow.list_zones()
    print("Zone id=%s name=%s" % (zone.id, zone.name))

    print("Zone has the following items:")
    for item in zone.items:
        print("Zone item: %s" % item)

# Get a zone by id - beware, id not name
zone = flow.get_zone("21344ZsQZ")

# Get the "Default" zone
zone = flow.get_default_zone()

Changing the settings of a zone

flow = project.get_flow()
zone = flow.get_zone("21344ZsQZ")

settings = zone.get_settings()
settings.name = "New name"

settings.save()

Getting the zone of a dataset

dataset = project.get_dataset("mydataset")

zone = dataset.get_zone()
print("Dataset is in zone %s" % zone.id)

Schema propagation

When the schema of an input dataset is modified, or when the settings of a recipe are modified, you need to propagate this schema change across the flow.

This can be done from the UI, but can also be automated through the API

flow = project.get_flow()

# A propagation always starts from a source dataset and will move from left to right till the end of the Flow

propagation = flow.new_schema_propagation("sourcedataset")

future = propagation.start()
future.wait_for_result()

There are many options for propagation, see dataikuapi.dss.flow.DSSSchemaPropagationRunBuilder

Reference documentation

class dataikuapi.dss.flow.DSSProjectFlow(client, project)
get_graph()
create_zone(name, color='#2ab1ac')

Creates a new flow zone

:returns the newly created zone :rtype: DSSFlowZone

get_zone(id)

Gets a single Flow zone by id :rtype: DSSFlowZone

get_default_zone()

Returns the default zone of the Flow :rtype: DSSFlowZone

list_zones()

Lists all zones in the Flow :rtype: list of DSSFlowZone

get_zone_of_object(obj)

Finds the zone to which this object belongs.

If the object is not found in any specific zone, it belongs to the default zone, and the default zone is returned

Parameters:obj (object) – A dataikuapi.dss.dataset.DSSDataset, dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel to search
Return type:DSSFlowZone
replace_input_computable(current_ref, new_ref, type='DATASET')

This method replaces all references to a “computable” (Dataset, Managed Folder or Saved Model) as input of recipes in the whole Flow by a reference to another computable.

No specific checks are performed. It is your responsibility to ensure that the schema of the new dataset will be compatible with the previous one (in the case of datasets).

If new_ref references an object in a foreign project, this method will automatically ensure that new_ref is exposed to the current project

Parameters:
  • str (type) – Either a “simple” object name (dataset name, model id, managed folder id) or a foreign object reference in the form “FOREIGN_PROJECT_KEY.local_id”)
  • str – Either a “simple” object name (dataset name, model id, managed folder id) or a foreign object reference in the form “FOREIGN_PROJECT_KEY.local_id”)
  • str – The type of object being replaced (DATASET, SAVED_MODEL or MANAGED_FOLDER)
start_tool(type, data={})

Start a tool or open a view in the flow

Parameters:
  • str (type) – one of {COPY, CHECK_CONSISTENCY, PROPAGATE_SCHEMA} (tools) or {TAGS, CUSTOM_FIELDS, CONNECTIONS, COUNT_OF_RECORDS, FILESIZE, FILEFORMATS, RECIPES_ENGINES, RECIPES_CODE_ENVS, IMPALA_WRITE_MODE, HIVE_MODE, SPARK_ENGINE, SPARK_CONFIG, SPARK_PIPELINES, SQL_PIPELINES, PARTITIONING, PARTITIONS, SCENARIOS, CREATION, LAST_MODIFICATION, LAST_BUILD, RECENT_ACTIVITY, WATCH} (views)
  • dict (data) – initial data for the tool (optional)
Returns:

a flow.DSSFlowTool handle to interact with the newly-created tool or view

new_schema_propagation(dataset_name)

Start an automatic schema propagation from a dataset

Parameters:str (dataset_name) – name of a dataset to start propagating from

:returns a DSSSchemaPropagationRunBuilder to set options and start the propagation

class dataikuapi.dss.flow.DSSProjectFlowGraph(flow, data)
get_source_computables(as_type='dict')

Returns the list of source computables. :param as_type: How to return the source computables. Possible values are “dict” and “object”

Returns:if as_type=dict, each computable is returned as a dict containing at least “ref” and “type”. if as_type=object, each computable is returned as a dataikuapi.dss.dataset.DSSDataset,
dataikuapi.dss.managedfolder.DSSManagedFolder, dataikuapi.dss.savedmodel.DSSSavedModel, or streaming endpoint
get_source_recipes(as_type='dict')

Returns the list of source recipes. :param as_type: How to return the source recipes. Possible values are “dict” and “object”

Returns:if as_type=dict, each recipes is returned as a dict containing at least “ref” and “type”. if as_type=object, each computable is returned as a dataikuapi.dss.recipe.DSSRecipe,
get_source_datasets()

Returns the list of source datasets for this project. :rtype list of dataikuapi.dss.dataset.DSSDataset

get_successor_recipes(node, as_type='dict')

Returns a list of recipes that are a successor of a graph node

Parameters:
  • node – Either a name or dataikuapi.dss.dataset.DSSDataset object
  • as_type – How to return the successor recipes. Possible values are “dict” and “object”
:return if as_type=dict, each recipes is returned as a dict containing at least “ref” and “type”.
if as_type=object, each computable is returned as a dataikuapi.dss.recipe.DSSRecipe,
get_successor_computables(node, as_type='dict')

Returns a list of computables that are a successor of a given graph node

Parameters:as_type – How to return the successor recipes. Possible values are “dict” and “object”
:return if as_type=dict, each recipes is returned as a dict containing at least “ref” and “type”.
if as_type=object, each computable is returned as a dataikuapi.dss.recipe.DSSRecipe,
get_items_in_traversal_order(as_type='dict')
class dataikuapi.dss.flow.DSSFlowZone(flow, data)

A zone in the Flow. Do not create this object manually, use DSSProjectFlow.get_zone() or DSSProjectFlow.list_zones()

id
name
get_settings()

Gets the settings of this zone in order to modify them

Return type:DSSFlowZoneSettings
add_item(obj)

Adds an item to this zone.

The item will automatically be moved from its existing zone. Additional items may be moved to this zone as a result of the operation (notably the recipe generating obj).

Parameters:obj (object) – A dataikuapi.dss.dataset.DSSDataset, dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel to add to the zone
items

The list of items explicitly belonging to this zone.

This list is read-only, to modify it, use add_item() and remove_item().

Note that the “default” zone never has any items, as it contains all items that are not explicitly in a zone. To get the full list of items in a zone, including in the “default” zone, use the get_graph() method.

@rtype list of zone items, either dataikuapi.dss.dataset.DSSDataset,
dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel or dataiuapi.dss.recipe.DSSRecipe
shared

The list of items that have been explicitly pre-shared to this zone.

This list is read-only, to modify it, use add_shared() and remove_shared()

@rtype list of shared zone items, either dataikuapi.dss.dataset.DSSDataset,
dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel or dataiuapi.dss.recipe.DSSRecipe
get_graph()
class dataikuapi.dss.flow.DSSFlowZoneSettings(zone)

The settings of a flow zone. Do not create this directly, use DSSFlowZone.get_settings()

get_raw()

Gets the raw settings of the zone.

You cannot modify the items and shared elements through this class. Instead, use DSSFlowZone.add_item() and others

name
save()

Saves the settings of the zone

class dataikuapi.dss.flow.DSSSchemaPropagationRunBuilder(project, client, dataset_name)

Do not create this directly, use DSSProjectFlow.new_schema_propagation()

set_auto_rebuild(auto_rebuild)

Sets whether to automatically rebuild datasets if needed while propagating (default true)

set_default_partitioning_value(dimension, value)

In the case of partitioned flows, sets the default partition value to use when rebuilding, for a specific dimension name

Parameters:
  • dimension (str) – a partitioning dimension name
  • value (str) – a partitioning dimension value
set_partition_for_computable(full_id, partition)

In the case of partitioned flows, sets the partition id to use when building a particular computable. Overrides the default partitioning value per dimension

Parameters:
  • full_id (str) – Full name of the computable, in the form PROJECTKEY.id
  • partition (str) – a full partition id (all dimensions)
stop_at(recipe_name)

Marks a recipe as a recipe where propagation stops

mark_recipe_as_ok(name)

Marks a recipe as always considered as OK during propagation

set_grouping_update_options(recipe=None, remove_missing_aggregates=True, remove_missing_keys=True, new_aggregates={})

Sets update options for grouping recipes :param str recipe: if None, applies to all grouping recipes. Else, applies only to this name

set_window_update_options(recipe=None, remove_missing_aggregates=True, remove_missing_in_window=True, new_aggregates={})

Sets update options for window recipes :param str recipe: if None, applies to all window recipes. Else, applies only to this name

set_join_update_options(recipe=None, remove_missing_join_conditions=True, remove_missing_join_values=True, new_selected_columns={})

Sets update options for join recipes :param str recipe: if None, applies to all join recipes. Else, applies only to this name

start()

Starts the actual propagation. Returns a future to wait for completion

Return type:dataikuapi.dss.future.DSSFuture