Datasets (other operations)

Please see Datasets (introduction) for an introduction about interacting with datasets in Dataiku Python API

This page lists many usage examples for performing various operations (listed below) with datasets through Dataiku Python API. It is designed to give an overview of the main capabilities but is not an exhaustive documentation.

For reference exhaustive documentation, please see Datasets (reference)

In all examples, project is a dataikuapi.dss.project.DSSProject handle, obtained using client.get_project() or client.get_default_project()

Basic operations

Listing datasets

datasets = project.list_datasets()
# Returns a list of DSSDatasetListItem

for dataset in datasets:
        # Quick access to main information in the dataset list item
        print("Name: %s" % dataset.name)
        print("Type: %s" % dataset.type)
        print("Connection: %s" % dataset.connection)
        print("Tags: %s" % dataset.tags) # Returns a list of strings

        # You can also use the list item as a dict of all available dataset information
        print("Raw: %s" % dataset)

outputs

Name: train_set
Type: Filesystem
Connection: filesystem_managed
Tags: ["creator_admin"]
Raw: {  'checklists': {   'checklists': []},
        'customMeta': {   'kv': {   }},
        'flowOptions': {   'crossProjectBuildBehavior': 'DEFAULT',
                            'rebuildBehavior': 'NORMAL'},
        'formatParams': {  /* Parameters specific to each format type */ },
        'formatType': 'csv',
        'managed': False,
        'name': 'train_set',
        'tags' : ["mytag1"]
        'params': { /* Parameters specific to each dataset type */ "connection" : "filesystem_managed" },
        'partitioning': {   'dimensions': [], 'ignoreNonMatchingFile': False},
        'projectKey': 'TEST_PROJECT',
        'schema': {   'columns': [   {     'name': 'col0',
                                           'type': 'string'},
                                       {   'name': 'col1',
                                           'type': 'string'},
                                       /* Other columns ... */
                                       ],
                       'userModified': False},
        'tags': ['creator_admin'],
        'type': 'Filesystem'},
...
]

Deleting a dataset

dataset = project.get_dataset('TEST_DATASET')
dataset.delete(drop_data=True)

Modifying tags for a dataset

dataset = project.get_dataset("mydataset")
settings = dataset.get_settings()

print("Current tags are %s" % settings.tags)

# Change the tags
settings.tags = ["newtag1", "newtag2"]

# If we changed the settings, we must save
settings.save()

Reading and modifying the schema of a dataset

Warning

Modifying schema or settings of a dataset from within a DSS job should be done use dataiku.Dataset, NOT using DSSDataset

Using DSSDataset to modify the schema from within a DSS job would not be taken into account for subsequent activities in the job.

dataset = project.get_dataset("mydataset")
settings = dataset.get_settings()

for column in settings.schema_column:
        print("Have column name=%s type=%s" % (column["name"], column["type"]))

# Now, let's add a new column in the schema
settings.add_raw_schema_column({"name" : "test", "type": "string"})

# If we changed the settings, we must save
settings.save()

Building a dataset

You can start a job in order to build the dataset

dataset = project.get_dataset("mydataset")

# Build the dataset non recursively and waits for build to complete.
#Returns a :meth:`dataikuapi.dss.job.DSSJob`
job = dataset.build()

# Builds the dataset recursively
dataset.build(job_type="RECURSIVE_BUILD")

# Build a partition (for partitioned datasets)
dataset.build(partitions="partition1")

Programmatic creation and setup (external datasets)

The API allows you to leverage Dataiku’s automatic detection and configuration capabilities in order to programmatically create datasets or programmatically “autocomplete” the settings of a dataset.

SQL dataset: Programmatic creation

dataset = project.create_sql_table_dataset("mydataset", "PostgreSQL", "my_sql_connection", "mytable", "myschema")

# At this point, the dataset object has been initialized, but the schema of the underlying table
# has not yet been fetched, so the schema of the table and the schema of the dataset are not yet consistent

# We run autodetection
settings = dataset.autodetect_settings()
# settings is now an object containing the "suggested" new dataset settings, including the completed schema
# We can just save the new settings in order to "accept the suggestion"
settings.save()

SQL dataset: Modifying settings

The object returned by dataikuapi.dss.dataset.DSSDataset.get_settings() depends on the kind of dataset.

For a SQL dataset, it will be a dataikuapi.dss.dataset.SQLDatasetSettings.

dataset = project.get_dataset("mydataset")
settings = dataset.get_settings()

# Set the table targeted by this SQL dataset
settings.set_table(connection="myconnection", schema="myschema", table="mytable")
settings.save()

# If we have changed the table, there is a good chance that the schema is not good anymore, so we must
# have DSS redetect it. `autodetect_settings` will however only detect if the schema is empty, so let's clear it.
del settings.schema_columns[:]
settings.save()

# Redetect and save the suggestion
settings = dataset.autodetect_settings()
settings.save()

Files-based dataset: Programmatic creation

Generic method for most connections

This applies to all files-based datasets, but may require additional setup

dataset = project.create_fslike_dataset("mydataset", "HDFS", "name_of_connection", "path_in_connection")

# At this point, the dataset object has been initialized, but the format is still unknown, and the
# schema is empty, so the dataset is not yet usable

# We run autodetection
settings = dataset.autodetect_settings()
# settings is now an object containing the "suggested" new dataset settings, including the detected format
# and completed schema
# We can just save the new settings in order to "accept the suggestion"
settings.save()

Quick helpers for some connections

# For S3: allows you to specify the bucket (if the connection does not already force a bucket)
dataset = project.create_s3_dataset(dataset_name, connection, path_in_connection, bucket=None)

Uploaded datasets: programmatic creation and upload

dataset = project.create_upload_dataset("mydataset") # you can add connection= for the target connection

with open("localfiletoupload.csv", "rb") as f:
        dataset.uploaded_add_file(f, "localfiletoupload.csv")

# At this point, the dataset object has been initialized, but the format is still unknown, and the
# schema is empty, so the dataset is not yet usable

# We run autodetection
settings = dataset.autodetect_settings()
# settings is now an object containing the "suggested" new dataset settings, including the detected format
# andcompleted schema
# We can just save the new settings in order to "accept the suggestion"
settings.save()

Manual creation

You can create and setup all parameters of a dataset yourself. We do not recommend using this method.

For example loading the csv files of a folder

project = client.get_project('TEST_PROJECT')
folder_path = 'path/to/folder/'
for file in listdir(folder_path):
    if not file.endswith('.csv'):
        continue
    dataset = project.create_dataset(file[:-4]  # dot is not allowed in dataset names
        ,'Filesystem'
        , params={
            'connection': 'filesystem_root'
            ,'path': folder_path + file
        }, formatType='csv'
        , formatParams={
            'separator': ','
            ,'style': 'excel'  # excel-style quoting
            ,'parseHeaderRow': True
        })
    df = pandas.read_csv(folder_path + file)
    dataset.set_schema({'columns': [{'name': column, 'type':'string'} for column in df.columns]})

Programmatic creation and setup (managed datasets)

Managed datasets are much easier to create because they are managed by DSS

Creating a new SQL managed dataset

builder = project.new_managed_dataset("mydatasetname")
builder.with_store_into("mysqlconnection")
dataset = builder.create()

Creating a new Files-based managed dataset with a specific schema

builder = project.new_managed_dataset("mydatasetname")
builder.with_store_into("myhdfsconnection", format_option_id="PARQUET_HIVE")
dataset = builder.create()

Creating a new partitioned managed dataset

This dataset copies partitioning from an existing dataset

builder = project.new_managed_dataset("mydatasetname")
builder.with_store_into("myhdfsconnection")
builder.with_copy_partitioning_from("source_dataset")
dataset = builder.create()

Flow handling

For more details, please see Flow creation and management on programmatic flow building.

Creating recipes from a dataset

This example creates a sync recipe to sync a dataset to another

recipe_builder = dataset.new_recipe("sync")
recipe_builder.with_new_output("target_dataset", "target_connection_name")
recipe = builder.create()

# recipe is now a :class:`dataikuapi.dss.recipe.DSSRecipe`, and you can run it
recipe.run()

This example creates a code recipe from this dataset

recipe_builder = dataset.new_recipe("python")
recipe_builder.with_code("""
import dataiku
from dataiku import recipe

input_dataset = recipe.get_inputs_as_datasets()[0]
output_dataset = recipe.get_outputs_as_datasets()[0]

df = input_dataset.get_dataframe()
df = df.groupby("mycol").count()

output_dataset.write_with_schema(df)
""")
recipe_builder.with_new_output_dataset("target_dataset", "target_connection_name")
recipe = builder.create()

# recipe is now a :class:`dataikuapi.dss.recipe.DSSRecipe`, and you can run it
recipe.run()

ML & Statistics

Creating ML models

You can create a ML Task in order to train models based on a dataset. See Machine learning for more details.

dataset = project.get_dataset('mydataset')
mltask = dataset.create_prediction_ml_task("variable_to_predict")
mltask.train()

Creating statistics worksheets

For more details, please see Statistics worksheets

dataset = project.get_dataset('mydataset')
ws = datasets.create_statistics_worksheet(name="New worksheet")

Misc operations

Listing partitions

For partitioned datasets, the list of partitions is retrieved with list_partitions():

partitions = dataset.list_partitions()
# partitions is a list of string

Clearing data

The rows of the dataset can be cleared, entirely or on a per-partition basis, with the clear() method.

dataset = project.get_dataset('SOME_DATASET')
dataset.clear(['partition_spec_1', 'partition_spec_2'])         # clears specified partitions
dataset.clear()                                                                                         # clears all partitions

Hive operations

For datasets associated with a table in the Hive metastore, the synchronization of the table definition in the metastore with the dataset’s schema in DSS will be needed before it can be visible to Hive, and usable by Impala queries.

dataset = project.get_dataset('SOME_HDFS_DATASET')
dataset.synchronize_hive_metastore()

Or in the other direction, to synchronize the dataset’s information from Hive

dataset = project.get_dataset('SOME_HDFS_DATASET')
dataset.update_from_hive()

# This will have the updated settings
settings = dataset.get_settings()