Flow creation and management¶
Programmatically building a Flow¶
The flow, including datasets, recipes, … can be fully managed and created programmatically.
Datasets can be created and managed using the methods detailed in Datasets (other operations).
Recipes can be created using the new_recipe()
method. This follows a builder pattern: new_recipe
returns you a DSSRecipeCreator
, on which you add settings, and then call the create()
method to actually create the recipe object.
The builder objects reproduce the functionality available in the recipe creation modals in the UI, so for more control on the recipe’s setup, it is often necessary to get its settings after creation, modify it, and save it again.
Creating a Python recipe¶
builder = project.new_recipe("python")
# Set the input
builder.with_input("myinputdataset")
# Create a new managed dataset for the output in the filesystem_managed connection
builder.with_new_output_dataset("grouped_dataset", "filesystem_managed")
# Set the code - builder is a PythonRecipeCreator, and has a ``with_script`` method
builder.with_script("""
import dataiku
from dataiku import recipe
input_dataset = recipe.get_inputs_as_datasets()[0]
output_dataset = recipe.get_outputs_as_datasets()[0]
df = input_dataset.get_dataframe()
df = df.groupby("something").count()
output_dataset.write_with_schema(df)
""")
recipe = builder.create()
# recipe is now a ``DSSRecipe`` representing the new recipe, and we can now run it
job = recipe.run()
Creating a Sync recipe¶
builder = project.new_recipe("sync")
builder = builder.with_input("input_dataset_name")
builder = builder.with_new_output("output_dataset_name", "hdfs_managed", format_option_id="PARQUET_HIVE")
recipe = builder.create()
job = recipe.run()
Creating and modifying a grouping recipe¶
The recipe creation mostly handles setting up the inputs and outputs of the recipes, so most of the setup of the recipe has to be done by retrieving its settings, altering and saving them, then applying schema changes to the output
builder = project.new_recipe("grouping") builder.with_input("dataset_to_group_on") # Create a new managed dataset for the output in the "filesystem_managed" connection builder.with_new_output("grouped_dataset", "filesystem_managed") builder.with_group_key("column") recipe = builder.build() # After the recipe is created, you can edit its settings recipe_settings = recipe.get_settings() recipe_settings.set_column_aggregations("myvaluecolumn", sum=True) recipe_settings.save() # And you may need to apply new schemas to the outputs # This will add the myvaluecolumn_sum to the "grouped_dataset" dataset recipe.compute_schema_updates().apply() # It should be noted that running a recipe is equivalent to building its output(s) job = recipe.run()
A complete example¶
This examples shows a complete chain:
Creating an external dataset
Automatically detecting the settings of the dataset (see Datasets (other operations) for details)
Creating a prepare recipe to cleanup the dataset
Then chaining a grouping recipe, setting an aggregation on it
Running the entire chain
dataset = project.create_sql_table_dataset("mydataset", "PostgreSQL", "my_sql_connection", "mytable", "myschema")
dataset_settings = dataset.autodetect_settings()
dataset_settings.save()
# As a shortcut, we can call new_recipe on the DSSDataset object. This way, we don't need to call "with_input"
prepare_builder = dataset.new_recipe("prepare")
prepare_builder.with_new_output("mydataset_cleaned", "filesystem_managed")
prepare_recipe = prepare_builder.create()
# Add a step to clean values in "doublecol" that are not valid doubles
prepare_settings = prepare_recipe.get_settings()
prepare_settings.add_filter_on_bad_meaning("DoubleMeaning", "doublecol")
prepare_settings.save()
prepare_recipe.compute_schema_updates().apply()
prepare_recipe().run()
# Grouping recipe
grouping_builder = project.new_recipe("grouping")
grouping_builder.with_input("mydataset_cleaned")
grouping_builder.with_new_output("mydataset_cleaned_grouped", "filesystem_managed")
grouping_builder.with_group_key("column")
grouping_recipe = grouping_builder.build()
grouping_recipe_settings = grouping_recipe.get_settings()
grouping_recipe_settings.set_column_aggregations("myvaluecolumn", sum=True)
grouping_recipe_settings.save()
grouping_recipe_settings.compute_schema_updates().apply()
grouping_recipe_settings.run()
Working with flow zones¶
Creating a zone and adding items in it¶
flow = project.get_flow()
zone = flow.create_zone("zone1")
# First way of adding an item to a zone
dataset = project.get_dataset("mydataset")
zone.add_item(dataset)
# Second way of adding an item to a zone
dataset = project.get_dataset("mydataset")
dataset.move_to_zone(zone.id)
# Third way of adding an item to a zone
dataset = project.get_dataset("mydataset")
dataset.move_to_zone(zone)
Listing and getting zones¶
# List zones
for zone in flow.list_zones()
print("Zone id=%s name=%s" % (zone.id, zone.name))
print("Zone has the following items:")
for item in zone.items:
print("Zone item: %s" % item)
# Get a zone by id - beware, id not name
zone = flow.get_zone("21344ZsQZ")
# Get the "Default" zone
zone = flow.get_default_zone()
Changing the settings of a zone¶
flow = project.get_flow()
zone = flow.get_zone("21344ZsQZ")
settings = zone.get_settings()
settings.name = "New name"
settings.save()
Getting the zone of a dataset¶
dataset = project.get_dataset("mydataset")
zone = dataset.get_zone()
print("Dataset is in zone %s" % zone.id)
Schema propagation¶
When the schema of an input dataset is modified, or when the settings of a recipe are modified, you need to propagate this schema change across the flow.
This can be done from the UI, but can also be automated through the API
flow = project.get_flow()
# A propagation always starts from a source dataset and will move from left to right till the end of the Flow
propagation = flow.new_schema_propagation("sourcedataset")
future = propagation.start()
future.wait_for_result()
There are many options for propagation, see dataikuapi.dss.flow.DSSSchemaPropagationRunBuilder
Exporting a flow documentation¶
This sample shows how to generate and download a flow documentation from a template.
See Flow Document Generator for more information.
# project is a DSSProject object
flow = project.get_flow()
# Launch the flow document generation by either
# using the default template by calling without arguments
# or specifying a managed folder id and the path to the template to use in that folder
future = flow.generate_documentation(FOLDER_ID, "path/my_template.docx")
# Alternatively, use a custom uploaded template file
with open("my_template.docx", "rb") as f:
future = flow.generate_documentation_from_custom_template(f)
# Wait for the generation to finish, retrieve the result and download the generated
# flow documentation to the specified file
result = future.wait_for_result()
export_id = result["exportId"]
flow.download_documentation_to_file(export_id, "path/my_flow_documentation.docx")
Reference documentation¶
-
class
dataikuapi.dss.flow.
DSSProjectFlow
(client, project)¶ -
get_graph
()¶
-
create_zone
(name, color='#2ab1ac')¶ Creates a new flow zone
:returns the newly created zone :rtype:
DSSFlowZone
-
get_zone
(id)¶ Gets a single Flow zone by id :rtype:
DSSFlowZone
-
get_default_zone
()¶ Returns the default zone of the Flow :rtype:
DSSFlowZone
-
list_zones
()¶ Lists all zones in the Flow :rtype: list of
DSSFlowZone
-
get_zone_of_object
(obj)¶ Finds the zone to which this object belongs.
If the object is not found in any specific zone, it belongs to the default zone, and the default zone is returned
- Parameters
obj (object) – A
dataikuapi.dss.dataset.DSSDataset
,dataikuapi.dss.managedfolder.DSSManagedFolder
, ordataikuapi.dss.savedmodel.DSSSavedModel
to search- Return type
-
replace_input_computable
(current_ref, new_ref, type='DATASET')¶ This method replaces all references to a “computable” (Dataset, Managed Folder or Saved Model) as input of recipes in the whole Flow by a reference to another computable.
No specific checks are performed. It is your responsibility to ensure that the schema of the new dataset will be compatible with the previous one (in the case of datasets).
If new_ref references an object in a foreign project, this method will automatically ensure that new_ref is exposed to the current project
- Parameters
str (type) – Either a “simple” object name (dataset name, model id, managed folder id) or a foreign object reference in the form “FOREIGN_PROJECT_KEY.local_id”)
str – Either a “simple” object name (dataset name, model id, managed folder id) or a foreign object reference in the form “FOREIGN_PROJECT_KEY.local_id”)
str – The type of object being replaced (DATASET, SAVED_MODEL or MANAGED_FOLDER)
-
generate_documentation
(folder_id=None, path=None)¶ Start the flow document generation from a template docx file in a managed folder, or from the default template if no folder id and path are specified.
- Parameters
folder_id – (optional) the id of the managed folder
path – (optional) the path to the file from the root of the folder
- Returns
A
DSSFuture
representing the flow document generation process
-
generate_documentation_from_custom_template
(fp)¶ Start the flow document generation from a docx template (as a file object).
- Parameters
fp (object) – A file-like object pointing to a template docx file
- Returns
A
DSSFuture
representing the flow document generation process
-
download_documentation_stream
(export_id)¶ Download a flow documentation, as a binary stream.
Warning: this stream will monopolize the DSSClient until closed.
- Parameters
export_id – the id of the generated flow documentation returned as the result of the future
- Returns
A
DSSFuture
representing the flow document generation process
-
download_documentation_to_file
(export_id, path)¶ Download a flow documentation into the given output file.
- Parameters
export_id – the id of the generated flow documentation returned as the result of the future
path – the path where to download the flow documentation
- Returns
None
-
start_tool
(type, data={})¶ Start a tool or open a view in the flow
- Parameters
str (type) – one of {COPY, CHECK_CONSISTENCY, PROPAGATE_SCHEMA} (tools) or {TAGS, CUSTOM_FIELDS, CONNECTIONS, COUNT_OF_RECORDS, FILESIZE, FILEFORMATS, RECIPES_ENGINES, RECIPES_CODE_ENVS, IMPALA_WRITE_MODE, HIVE_MODE, SPARK_ENGINE, SPARK_CONFIG, SPARK_PIPELINES, SQL_PIPELINES, PARTITIONING, PARTITIONS, SCENARIOS, CREATION, LAST_MODIFICATION, LAST_BUILD, RECENT_ACTIVITY, WATCH} (views)
dict (data) – initial data for the tool (optional)
- Returns
a
flow.DSSFlowTool
handle to interact with the newly-created tool or view
-
new_schema_propagation
(dataset_name)¶ Start an automatic schema propagation from a dataset
- Parameters
str (dataset_name) – name of a dataset to start propagating from
:returns a
DSSSchemaPropagationRunBuilder
to set options and start the propagation
-
-
class
dataikuapi.dss.flow.
DSSProjectFlowGraph
(flow, data)¶ -
get_source_computables
(as_type='dict')¶ Returns the list of source computables. :param as_type: How to return the source computables. Possible values are “dict” and “object”
- Returns
if as_type=dict, each computable is returned as a dict containing at least “ref” and “type”. if as_type=object, each computable is returned as a
dataikuapi.dss.dataset.DSSDataset
,dataikuapi.dss.managedfolder.DSSManagedFolder
,dataikuapi.dss.savedmodel.DSSSavedModel
, or streaming endpoint
-
get_source_recipes
(as_type='dict')¶ Returns the list of source recipes. :param as_type: How to return the source recipes. Possible values are “dict” and “object”
- Returns
if as_type=dict, each recipes is returned as a dict containing at least “ref” and “type”. if as_type=object, each computable is returned as a
dataikuapi.dss.recipe.DSSRecipe
,
-
get_source_datasets
()¶ Returns the list of source datasets for this project. :rtype list of
dataikuapi.dss.dataset.DSSDataset
-
get_successor_recipes
(node, as_type='dict')¶ Returns a list of recipes that are a successor of a graph node
- Parameters
node – Either a name or
dataikuapi.dss.dataset.DSSDataset
objectas_type – How to return the successor recipes. Possible values are “dict” and “object”
- :return if as_type=dict, each recipes is returned as a dict containing at least “ref” and “type”.
if as_type=object, each computable is returned as a
dataikuapi.dss.recipe.DSSRecipe
,
-
get_successor_computables
(node, as_type='dict')¶ Returns a list of computables that are a successor of a given graph node
- Parameters
as_type – How to return the successor recipes. Possible values are “dict” and “object”
- :return if as_type=dict, each recipes is returned as a dict containing at least “ref” and “type”.
if as_type=object, each computable is returned as a
dataikuapi.dss.recipe.DSSRecipe
,
-
get_items_in_traversal_order
(as_type='dict')¶
-
-
class
dataikuapi.dss.flow.
DSSFlowZone
(flow, data)¶ A zone in the Flow. Do not create this object manually, use
DSSProjectFlow.get_zone()
orDSSProjectFlow.list_zones()
-
property
id
¶
-
property
name
¶
-
property
color
¶
-
get_settings
()¶ Gets the settings of this zone in order to modify them
- Return type
-
add_item
(obj)¶ Adds an item to this zone.
The item will automatically be moved from its existing zone. Additional items may be moved to this zone as a result of the operation (notably the recipe generating obj).
- Parameters
obj (object) – A
dataikuapi.dss.dataset.DSSDataset
,dataikuapi.dss.managedfolder.DSSManagedFolder
, ordataikuapi.dss.savedmodel.DSSSavedModel
to add to the zone
-
add_items
(items)¶ Adds items to this zone.
The items will automatically be moved from their existing zones. Additional items may be moved to this zone as a result of the operations (notably the recipe generating the items).
- Parameters
items (list) – A list of objects, either
dataikuapi.dss.dataset.DSSDataset
,dataikuapi.dss.managedfolder.DSSManagedFolder
, ordataikuapi.dss.savedmodel.DSSSavedModel
to add to the zone
-
property
items
¶ The list of items explicitly belonging to this zone.
This list is read-only, to modify it, use
add_item()
andremove_item()
.Note that the “default” zone never has any items, as it contains all items that are not explicitly in a zone. To get the full list of items in a zone, including in the “default” zone, use the
get_graph()
method.- @rtype list of zone items, either
dataikuapi.dss.dataset.DSSDataset
, dataikuapi.dss.managedfolder.DSSManagedFolder
, ordataikuapi.dss.savedmodel.DSSSavedModel
ordataiuapi.dss.recipe.DSSRecipe
- @rtype list of zone items, either
Share an item to this zone.
The item will not be automatically unshared from its existing zone.
- Parameters
obj (object) – A
dataikuapi.dss.dataset.DSSDataset
,dataikuapi.dss.managedfolder.DSSManagedFolder
, ordataikuapi.dss.savedmodel.DSSSavedModel
to share to the zone
Remove a shared item from this zone.
- Parameters
obj (object) – A
dataikuapi.dss.dataset.DSSDataset
,dataikuapi.dss.managedfolder.DSSManagedFolder
, ordataikuapi.dss.savedmodel.DSSSavedModel
to share to the zone
The list of items that have been explicitly pre-shared to this zone.
This list is read-only, to modify it, use
add_shared()
andremove_shared()
- @rtype list of shared zone items, either
dataikuapi.dss.dataset.DSSDataset
, dataikuapi.dss.managedfolder.DSSManagedFolder
, ordataikuapi.dss.savedmodel.DSSSavedModel
ordataiuapi.dss.recipe.DSSRecipe
- @rtype list of shared zone items, either
-
get_graph
()¶
-
delete
()¶ Delete the zone, all items will be moved to the default zone
-
property
-
class
dataikuapi.dss.flow.
DSSFlowZoneSettings
(zone)¶ The settings of a flow zone. Do not create this directly, use
DSSFlowZone.get_settings()
-
get_raw
()¶ Gets the raw settings of the zone.
You cannot modify the items and shared elements through this class. Instead, use
DSSFlowZone.add_item()
and others
-
property
name
¶
-
property
color
¶
-
save
()¶ Saves the settings of the zone
-
-
class
dataikuapi.dss.flow.
DSSSchemaPropagationRunBuilder
(project, client, dataset_name)¶ Do not create this directly, use
DSSProjectFlow.new_schema_propagation()
-
set_auto_rebuild
(auto_rebuild)¶ Sets whether to automatically rebuild datasets if needed while propagating (default true)
-
set_default_partitioning_value
(dimension, value)¶ In the case of partitioned flows, sets the default partition value to use when rebuilding, for a specific dimension name
- Parameters
dimension (str) – a partitioning dimension name
value (str) – a partitioning dimension value
-
set_partition_for_computable
(full_id, partition)¶ In the case of partitioned flows, sets the partition id to use when building a particular computable. Overrides the default partitioning value per dimension
- Parameters
full_id (str) – Full name of the computable, in the form PROJECTKEY.id
partition (str) – a full partition id (all dimensions)
-
stop_at
(recipe_name)¶ Marks a recipe as a recipe where propagation stops
-
mark_recipe_as_ok
(name)¶ Marks a recipe as always considered as OK during propagation
-
set_grouping_update_options
(recipe=None, remove_missing_aggregates=True, remove_missing_keys=True, new_aggregates={})¶ Sets update options for grouping recipes :param str recipe: if None, applies to all grouping recipes. Else, applies only to this name
-
set_window_update_options
(recipe=None, remove_missing_aggregates=True, remove_missing_in_window=True, new_aggregates={})¶ Sets update options for window recipes :param str recipe: if None, applies to all window recipes. Else, applies only to this name
-
set_join_update_options
(recipe=None, remove_missing_join_conditions=True, remove_missing_join_values=True, new_selected_columns={})¶ Sets update options for join recipes :param str recipe: if None, applies to all join recipes. Else, applies only to this name
-
start
()¶ Starts the actual propagation. Returns a future to wait for completion
- Return type
-
-
class
dataikuapi.dss.flow.
DSSFlowTool
(client, project_key, tool_id)¶ Handle to interact with a flow tool
-
stop
()¶ Stops the tool and releases the resources held by it
-
get_state
(options={})¶ Get the current state of the tool or view
- Returns
the state, as a dict
-
do
(action)¶ Perform a manual user action on the tool
- Returns
the current state, as a dict
-
update
(options={})¶ (for tools only) Start updating the tool state
- Params options dict
options for the update (optional)
- Returns
a
future.DSSFuture
handle to interact with task of performing the update
-