Managing datasets

Datasets belong to a given project, so all access to datasets implies getting a handle of the project first.

Basic operations

The list of all datasets of a project is accessible via list_datasets()

project = client.get_project('TEST_PROJECT')
datasets = project.list_datasets()
prettyprinter.pprint(datasets)

outputs

[   {   'checklists': {   'checklists': []},
        'customMeta': {   'kv': {   }},
        'flowOptions': {   'crossProjectBuildBehavior': 'DEFAULT',
                            'rebuildBehavior': 'NORMAL'},
        'formatParams': {   'arrayMapFormat': 'json',
                             'charset': 'utf8',
                             'compress': '',
                             'dateSerializationFormat': 'ISO',
                             'escapeChar': '\\',
                             'hiveSeparators': [   '\x02',
                                                    '\x03',
                                                    '\x04',
                                                    '\x05',
                                                    '\x06',
                                                    '\x07',
                                                    '\x08'],
                             'parseHeaderRow': True,
                             'probableNumberOfRecords': 8,
                             'quoteChar': '"',
                             'separator': ',',
                             'skipRowsAfterHeader': 0,
                             'skipRowsBeforeHeader': 0,
                             'style': 'excel'},
        'formatType': 'csv',
        'managed': False,
        'name': 'train_set',
        'params': {   },
        'partitioning': {   'dimensions': [], 'ignoreNonMatchingFile': False},
        'projectKey': 'TEST_PROJECT',
        'schema': {   'columns': [   {   'maxLength': -1,
                                           'name': 'col0',
                                           'type': 'string'},
                                       {   'maxLength': -1,
                                           'name': 'col1',
                                           'type': 'string'},
                                       ...
                                       ],
                       'userModified': False},
        'tags': ['creator_admin'],
        'type': 'UploadedFiles'},
...
]

Datasets can be created. For example loading the csv files of a folder

project = client.get_project('TEST_PROJECT')
folder_path = 'path/to/folder/'
for file in listdir(folder_path):
    if not file.endswith('.csv'):
        continue
    dataset = project.create_dataset(file[:-4]  # dot is not allowed in dataset names
        ,'Filesystem'
        , params={
            'connection': 'filesystem_root'
            ,'path': folder_path + file
        }, formatType='csv'
        , formatParams={
            'separator': ','
            ,'style': 'excel'  # excel-style quoting
            ,'parseHeaderRow': True
        })
    df = pandas.read_csv(folder_path + file)
    dataset.set_schema({'columns': [{'name': column, 'type':'string'} for column in df.columns]})

Datasets can be deleted.

dataset = project.get_dataset('TEST_DATASET')
dataset.delete()

The metadata of the dataset can be modified. It is advised to first retrieve the current settings state with the get_metadata call, modify the returned object, and then set it back on the DSS instance.

dataset_metadata = dataset.get_metadata()
dataset_metadata['tag'] = ['tag1','tag2']
dataset.set_metadata(dataset_metadata)

Accessing the dataset data

The data of a dataset can be streamed over http to the API client with the iter_rows() method. This call returns the raw data, so that in most cases it is necessary to first get the dataset’s schema with a call to get_schema(). For example, printing the first 10 rows can be done with

columns = [column['name'] for column in dataset.get_schema()['columns']]
print(columns)
row_count = 0
for row in dataset.iter_rows():
        print(row)
        row_count = row_count + 1
        if row_count >= 10:
                break

outputs

['tube_assembly_id', 'supplier', 'quote_date', 'annual_usage', 'min_order_quantity', 'bracket_pricing', 'quantity', 'cost']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '1', '21.9059330191461']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '2', '12.3412139792904']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '5', '6.60182614356538']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '10', '4.6877695119712']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '25', '3.54156118026073']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '50', '3.22440644770007']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '100', '3.08252143576504']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '250', '2.99905966403855']
['TA-00004', 'S-0066', '2013-07-07', '0', '0', 'Yes', '1', '21.9727024365273']
['TA-00004', 'S-0066', '2013-07-07', '0', '0', 'Yes', '2', '12.4079833966715']

The schema of a data can be modified with the set_schema() method:

schema = dataset.get_schema()
schema['columns'].append({'name' : 'new_column', 'type' : 'bigint'})
dataset.set_schema(schema)

For partitioned datasets, the list of partitions is retrieved with list_partitions():

partitions = dataset.list_partitions()

And the data of a given partition can be retrieved by passing the appropriate partition spec as parameter to iter_rows():

row_count = 0
for row in dataset.iter_rows(partitions='partition_spec1,partition_spec2'):
        print(row)
        row_count = row_count + 1
        if row_count >= 10:
                break

Dataset operations

The rows of the dataset can be cleared, entirely or on a per-partition basis, with the clear() method.

dataset = project.get_dataset('SOME_DATASET')
dataset.clear(['partition_spec_1', 'partition_spec_2'])         # clears specified partitions
dataset.clear()                                                                                         # clears all partitions

For datasets associated with a table in the Hive metastore, the synchronization of the table definition in the metastore with the dataset’s schema in DSS will be needed before it can be visible to Hive, and usable by Impala queries.

dataset = project.get_dataset('SOME_HDFS_DATASET')
dataset.synchronize_hive_metastore()