Flow creation and management

Programmatically building a Flow

The flow, including datasets, recipes, … can be fully managed and created programmatically.

Datasets can be created and managed using the methods detailed in Datasets (other operations).

Recipes can be created using the new_recipe() method. This follows a builder pattern: new_recipe returns you a DSSRecipeCreator, on which you add settings, and then call the create() method to actually create the recipe object.

The builder objects reproduce the functionality available in the recipe creation modals in the UI, so for more control on the recipe’s setup, it is often necessary to get its settings after creation, modify it, and save it again.

Creating a Python recipe

builder = project.new_recipe("python")

# Set the input
builder.with_input("myinputdataset")
# Create a new managed dataset for the output in the filesystem_managed connection
builder.with_new_output_dataset("grouped_dataset", "filesystem_managed")

# Set the code - builder is a PythonRecipeCreator, and has a ``with_script`` method
builder.with_script("""
import dataiku
from dataiku import recipe
input_dataset = recipe.get_inputs_as_datasets()[0]
output_dataset = recipe.get_outputs_as_datasets()[0]

df = input_dataset.get_dataframe()
df = df.groupby("something").count()
output_dataset.write_with_schema(df)
""")

recipe = builder.create()

# recipe is now a ``DSSRecipe`` representing the new recipe, and we can now run it
job = recipe.run()

Creating a Sync recipe

builder = project.new_recipe("sync")
builder = builder.with_input("input_dataset_name")
builder = builder.with_new_output("output_dataset_name", "hdfs_managed", format_option_id="PARQUET_HIVE")

recipe = builder.create()
job = recipe.run()

Creating and modifying a grouping recipe

The recipe creation mostly handles setting up the inputs and outputs of the recipes, so most of the setup of the recipe has to be done by retrieving its settings, altering and saving them, then applying schema changes to the output

builder = project.new_recipe("grouping")
builder.with_input("dataset_to_group_on")
# Create a new managed dataset for the output in the "filesystem_managed" connection
builder.with_new_output("grouped_dataset", "filesystem_managed")
builder.with_group_key("column")
recipe = builder.build()

# After the recipe is created, you can edit its settings
recipe_settings = recipe.get_settings()
recipe_settings.set_column_aggregations("myvaluecolumn", sum=True)
recipe_settings.save()

# And you may need to apply new schemas to the outputs
# This will add the myvaluecolumn_sum to the "grouped_dataset" dataset
recipe.compute_schema_updates().apply()

# It should be noted that running a recipe is equivalent to building its output(s)
job = recipe.run()

A complete example

This examples shows a complete chain:

  • Creating an external dataset

  • Automatically detecting the settings of the dataset (see Datasets (other operations) for details)

  • Creating a prepare recipe to cleanup the dataset

  • Then chaining a grouping recipe, setting an aggregation on it

  • Running the entire chain

dataset = project.create_sql_table_dataset("mydataset", "PostgreSQL", "my_sql_connection", "mytable", "myschema")

dataset_settings = dataset.autodetect_settings()
dataset_settings.save()

# As a shortcut, we can call new_recipe on the DSSDataset object. This way, we don't need to call "with_input"

prepare_builder = dataset.new_recipe("prepare")
prepare_builder.with_new_output("mydataset_cleaned", "filesystem_managed")

prepare_recipe = prepare_builder.create()

# Add a step to clean values in "doublecol" that are not valid doubles
prepare_settings = prepare_recipe.get_settings()
prepare_settings.add_filter_on_bad_meaning("DoubleMeaning", "doublecol")
prepare_settings.save()

prepare_recipe.compute_schema_updates().apply()
prepare_recipe().run()

# Grouping recipe

grouping_builder = project.new_recipe("grouping")
grouping_builder.with_input("mydataset_cleaned")
grouping_builder.with_new_output("mydataset_cleaned_grouped", "filesystem_managed")
grouping_builder.with_group_key("column")
grouping_recipe = grouping_builder.build()

grouping_recipe_settings = grouping_recipe.get_settings()
grouping_recipe_settings.set_column_aggregations("myvaluecolumn", sum=True)
grouping_recipe_settings.save()
grouping_recipe_settings.compute_schema_updates().apply()

grouping_recipe_settings.run()

Working with flow zones

Creating a zone and adding items in it

flow = project.get_flow()
zone = flow.create_zone("zone1")

# First way of adding an item to a zone
dataset = project.get_dataset("mydataset")
zone.add_item(dataset)

# Second way of adding an item to a zone
dataset = project.get_dataset("mydataset")
dataset.move_to_zone(zone.id)

# Third way of adding an item to a zone
dataset = project.get_dataset("mydataset")
dataset.move_to_zone(zone)

Listing and getting zones

# List zones

for zone in flow.list_zones()
    print("Zone id=%s name=%s" % (zone.id, zone.name))

    print("Zone has the following items:")
    for item in zone.items:
        print("Zone item: %s" % item)

# Get a zone by id - beware, id not name
zone = flow.get_zone("21344ZsQZ")

# Get the "Default" zone
zone = flow.get_default_zone()

Changing the settings of a zone

flow = project.get_flow()
zone = flow.get_zone("21344ZsQZ")

settings = zone.get_settings()
settings.name = "New name"

settings.save()

Getting the zone of a dataset

dataset = project.get_dataset("mydataset")

zone = dataset.get_zone()
print("Dataset is in zone %s" % zone.id)

Schema propagation

When the schema of an input dataset is modified, or when the settings of a recipe are modified, you need to propagate this schema change across the flow.

This can be done from the UI, but can also be automated through the API

flow = project.get_flow()

# A propagation always starts from a source dataset and will move from left to right till the end of the Flow

propagation = flow.new_schema_propagation("sourcedataset")

future = propagation.start()
future.wait_for_result()

There are many options for propagation, see dataikuapi.dss.flow.DSSSchemaPropagationRunBuilder

Exporting a flow documentation

This sample shows how to generate and download a flow documentation from a template.

See Flow Document Generator for more information.

# project is a DSSProject object

flow = project.get_flow()

# Launch the flow document generation by either
# using the default template by calling without arguments
# or specifying a managed folder id and the path to the template to use in that folder
future = flow.generate_documentation(FOLDER_ID, "path/my_template.docx")

# Alternatively, use a custom uploaded template file
with open("my_template.docx", "rb") as f:
    future = flow.generate_documentation_from_custom_template(f)

# Wait for the generation to finish, retrieve the result and download the generated
# flow documentation to the specified file
result = future.wait_for_result()
export_id = result["exportId"]

flow.download_documentation_to_file(export_id, "path/my_flow_documentation.docx")

Reference documentation

class dataikuapi.dss.flow.DSSProjectFlow(client, project)
get_graph()
create_zone(name, color='#2ab1ac')

Creates a new flow zone

:returns the newly created zone :rtype: DSSFlowZone

get_zone(id)

Gets a single Flow zone by id :rtype: DSSFlowZone

get_default_zone()

Returns the default zone of the Flow :rtype: DSSFlowZone

list_zones()

Lists all zones in the Flow :rtype: list of DSSFlowZone

get_zone_of_object(obj)

Finds the zone to which this object belongs.

If the object is not found in any specific zone, it belongs to the default zone, and the default zone is returned

Parameters

obj (object) – A dataikuapi.dss.dataset.DSSDataset, dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel to search

Return type

DSSFlowZone

replace_input_computable(current_ref, new_ref, type='DATASET')

This method replaces all references to a “computable” (Dataset, Managed Folder or Saved Model) as input of recipes in the whole Flow by a reference to another computable.

No specific checks are performed. It is your responsibility to ensure that the schema of the new dataset will be compatible with the previous one (in the case of datasets).

If new_ref references an object in a foreign project, this method will automatically ensure that new_ref is exposed to the current project

Parameters
  • str (type) – Either a “simple” object name (dataset name, model id, managed folder id) or a foreign object reference in the form “FOREIGN_PROJECT_KEY.local_id”)

  • str – Either a “simple” object name (dataset name, model id, managed folder id) or a foreign object reference in the form “FOREIGN_PROJECT_KEY.local_id”)

  • str – The type of object being replaced (DATASET, SAVED_MODEL or MANAGED_FOLDER)

generate_documentation(folder_id=None, path=None)

Start the flow document generation from a template docx file in a managed folder, or from the default template if no folder id and path are specified.

Parameters
  • folder_id – (optional) the id of the managed folder

  • path – (optional) the path to the file from the root of the folder

Returns

A DSSFuture representing the flow document generation process

generate_documentation_from_custom_template(fp)

Start the flow document generation from a docx template (as a file object).

Parameters

fp (object) – A file-like object pointing to a template docx file

Returns

A DSSFuture representing the flow document generation process

download_documentation_stream(export_id)

Download a flow documentation, as a binary stream.

Warning: this stream will monopolize the DSSClient until closed.

Parameters

export_id – the id of the generated flow documentation returned as the result of the future

Returns

A DSSFuture representing the flow document generation process

download_documentation_to_file(export_id, path)

Download a flow documentation into the given output file.

Parameters
  • export_id – the id of the generated flow documentation returned as the result of the future

  • path – the path where to download the flow documentation

Returns

None

start_tool(type, data={})

Start a tool or open a view in the flow

Parameters
  • str (type) – one of {COPY, CHECK_CONSISTENCY, PROPAGATE_SCHEMA} (tools) or {TAGS, CUSTOM_FIELDS, CONNECTIONS, COUNT_OF_RECORDS, FILESIZE, FILEFORMATS, RECIPES_ENGINES, RECIPES_CODE_ENVS, IMPALA_WRITE_MODE, HIVE_MODE, SPARK_ENGINE, SPARK_CONFIG, SPARK_PIPELINES, SQL_PIPELINES, PARTITIONING, PARTITIONS, SCENARIOS, CREATION, LAST_MODIFICATION, LAST_BUILD, RECENT_ACTIVITY, WATCH} (views)

  • dict (data) – initial data for the tool (optional)

Returns

a flow.DSSFlowTool handle to interact with the newly-created tool or view

new_schema_propagation(dataset_name)

Start an automatic schema propagation from a dataset

Parameters

str (dataset_name) – name of a dataset to start propagating from

:returns a DSSSchemaPropagationRunBuilder to set options and start the propagation

class dataikuapi.dss.flow.DSSProjectFlowGraph(flow, data)
get_source_computables(as_type='dict')

Returns the list of source computables. :param as_type: How to return the source computables. Possible values are “dict” and “object”

Returns

if as_type=dict, each computable is returned as a dict containing at least “ref” and “type”. if as_type=object, each computable is returned as a dataikuapi.dss.dataset.DSSDataset,

get_source_recipes(as_type='dict')

Returns the list of source recipes. :param as_type: How to return the source recipes. Possible values are “dict” and “object”

Returns

if as_type=dict, each recipes is returned as a dict containing at least “ref” and “type”. if as_type=object, each computable is returned as a dataikuapi.dss.recipe.DSSRecipe,

get_source_datasets()

Returns the list of source datasets for this project. :rtype list of dataikuapi.dss.dataset.DSSDataset

get_successor_recipes(node, as_type='dict')

Returns a list of recipes that are a successor of a graph node

Parameters
  • node – Either a name or dataikuapi.dss.dataset.DSSDataset object

  • as_type – How to return the successor recipes. Possible values are “dict” and “object”

:return if as_type=dict, each recipes is returned as a dict containing at least “ref” and “type”.

if as_type=object, each computable is returned as a dataikuapi.dss.recipe.DSSRecipe,

get_successor_computables(node, as_type='dict')

Returns a list of computables that are a successor of a given graph node

Parameters

as_type – How to return the successor recipes. Possible values are “dict” and “object”

:return if as_type=dict, each recipes is returned as a dict containing at least “ref” and “type”.

if as_type=object, each computable is returned as a dataikuapi.dss.recipe.DSSRecipe,

get_items_in_traversal_order(as_type='dict')
class dataikuapi.dss.flow.DSSFlowZone(flow, data)

A zone in the Flow. Do not create this object manually, use DSSProjectFlow.get_zone() or DSSProjectFlow.list_zones()

property id
property name
property color
get_settings()

Gets the settings of this zone in order to modify them

Return type

DSSFlowZoneSettings

add_item(obj)

Adds an item to this zone.

The item will automatically be moved from its existing zone. Additional items may be moved to this zone as a result of the operation (notably the recipe generating obj).

Parameters

obj (object) – A dataikuapi.dss.dataset.DSSDataset, dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel to add to the zone

add_items(items)

Adds items to this zone.

The items will automatically be moved from their existing zones. Additional items may be moved to this zone as a result of the operations (notably the recipe generating the items).

Parameters

items (list) – A list of objects, either dataikuapi.dss.dataset.DSSDataset, dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel to add to the zone

property items

The list of items explicitly belonging to this zone.

This list is read-only, to modify it, use add_item() and remove_item().

Note that the “default” zone never has any items, as it contains all items that are not explicitly in a zone. To get the full list of items in a zone, including in the “default” zone, use the get_graph() method.

@rtype list of zone items, either dataikuapi.dss.dataset.DSSDataset,

dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel or dataiuapi.dss.recipe.DSSRecipe

add_shared(obj)

Share an item to this zone.

The item will not be automatically unshared from its existing zone.

Parameters

obj (object) – A dataikuapi.dss.dataset.DSSDataset, dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel to share to the zone

remove_shared(obj)

Remove a shared item from this zone.

Parameters

obj (object) – A dataikuapi.dss.dataset.DSSDataset, dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel to share to the zone

property shared

The list of items that have been explicitly pre-shared to this zone.

This list is read-only, to modify it, use add_shared() and remove_shared()

@rtype list of shared zone items, either dataikuapi.dss.dataset.DSSDataset,

dataikuapi.dss.managedfolder.DSSManagedFolder, or dataikuapi.dss.savedmodel.DSSSavedModel or dataiuapi.dss.recipe.DSSRecipe

get_graph()
delete()

Delete the zone, all items will be moved to the default zone

class dataikuapi.dss.flow.DSSFlowZoneSettings(zone)

The settings of a flow zone. Do not create this directly, use DSSFlowZone.get_settings()

get_raw()

Gets the raw settings of the zone.

You cannot modify the items and shared elements through this class. Instead, use DSSFlowZone.add_item() and others

property name
property color
save()

Saves the settings of the zone

class dataikuapi.dss.flow.DSSSchemaPropagationRunBuilder(project, client, dataset_name)

Do not create this directly, use DSSProjectFlow.new_schema_propagation()

set_auto_rebuild(auto_rebuild)

Sets whether to automatically rebuild datasets if needed while propagating (default true)

set_default_partitioning_value(dimension, value)

In the case of partitioned flows, sets the default partition value to use when rebuilding, for a specific dimension name

Parameters
  • dimension (str) – a partitioning dimension name

  • value (str) – a partitioning dimension value

set_partition_for_computable(full_id, partition)

In the case of partitioned flows, sets the partition id to use when building a particular computable. Overrides the default partitioning value per dimension

Parameters
  • full_id (str) – Full name of the computable, in the form PROJECTKEY.id

  • partition (str) – a full partition id (all dimensions)

stop_at(recipe_name)

Marks a recipe as a recipe where propagation stops

mark_recipe_as_ok(name)

Marks a recipe as always considered as OK during propagation

set_grouping_update_options(recipe=None, remove_missing_aggregates=True, remove_missing_keys=True, new_aggregates={})

Sets update options for grouping recipes :param str recipe: if None, applies to all grouping recipes. Else, applies only to this name

set_window_update_options(recipe=None, remove_missing_aggregates=True, remove_missing_in_window=True, new_aggregates={})

Sets update options for window recipes :param str recipe: if None, applies to all window recipes. Else, applies only to this name

set_join_update_options(recipe=None, remove_missing_join_conditions=True, remove_missing_join_values=True, new_selected_columns={})

Sets update options for join recipes :param str recipe: if None, applies to all join recipes. Else, applies only to this name

start()

Starts the actual propagation. Returns a future to wait for completion

Return type

dataikuapi.dss.future.DSSFuture

class dataikuapi.dss.flow.DSSFlowTool(client, project_key, tool_id)

Handle to interact with a flow tool

stop()

Stops the tool and releases the resources held by it

get_state(options={})

Get the current state of the tool or view

Returns

the state, as a dict

do(action)

Perform a manual user action on the tool

Returns

the current state, as a dict

update(options={})

(for tools only) Start updating the tool state

Params options dict

options for the update (optional)

Returns

a future.DSSFuture handle to interact with task of performing the update