Jobs

The API offers methods to retrieve the list of jobs and their status, so that they can be monitored. Additionally, new jobs can be created to build datasets.

Reading the jobs’ status

The list of all jobs, finished or not, can be fetched with the list_jobs() method. For example, to retrieve job failures posterior to a given date:

date = '2015/09/24'
date_as_timestamp = int(datetime.datetime.strptime(date, "%Y/%m/%d").strftime('%s')) * 1000
project = client.get_project('TEST_PROJECT')
jobs = project.list_jobs()
failed_jobs = [job for job in jobs if job['state'] == 'FAILED' and job['def']['initiationTimestamp'] >= date_as_timestamp]

The method list_jobs() returns all job information for each job, as a JSON object. Important fields are:

{
        'def': {   'id': 'build_cat_train_hdfs_NP_2015-09-28T09-17-37.455',    # the identifier for the job
                'initiationTimestamp': 1443431857455,                      # timestamp of when the job was submitted
                'initiator': 'API (aa)',
                'mailNotification': False,
                'name': 'build_cat_train_hdfs_NP',
                'outputs': [   {   'targetDataset': 'cat_train_hdfs',      # the dataset(s) built by the job
                                    'targetDatasetProjectKey': 'IMPALA',
                                    'targetPartition': 'NP',
                                    'type': 'DATASET'}],
                'projectKey': 'IMPALA',
                'refreshHiveMetastore': False,
                'refreshIntermediateMirrors': True,
                'refreshTargetMirrors': True,
                'triggeredFrom': 'API',
                'type': 'NON_RECURSIVE_FORCED_BUILD'},
    'endTime': 0,
    'stableState': True,
    'startTime': 0,
    'state': 'ABORTED',                                                    # the stable state of the job
    'warningsCount': 0}

The id field is needed to get a handle of the job and call abort() or get_log() on it.

Starting new jobs

Datasets can be built by creating a job of which they are the output. A job is created by building a job definition and starting it. For a simple non-partitioned dataset, this is done with:

project = client.get_project('TEST_PROJECT')
definition = {
        "type" : "NON_RECURSIVE_FORCED_BUILD",
        "outputs" : [{
            "id" : "dataset_to_build",
            "type": "DATASET",
            "partition" : "NP"
        }]
    }
job = project.start_job(definition)
state = ''
while state != 'DONE' and state != 'FAILED' and state != 'ABORTED':
        time.sleep(1)
        state = job.get_status()['baseStatus']['state']
# done!

The example above uses start_job() to start a job, and then checks the job state every second until it is complete. Alternatively, the method start_and_wait() can be used to start a job and return only after job completion.

The start_job() method returns a job handle that can be used to later abort the job. Other jobs can be aborted once their id is known. For example, to abort all jobs currently being processed:

project = client.get_project('TEST_PROJECT')
for job in project.list_jobs():
    if job['stableState'] == False:
        project.get_job(job['def']['id']).abort()

Here’s another example of using DSSProject.new_job() to build a managed folder and the with_output method as an alternative to creating a dictionary job definition:

project = client.get_project('TEST_PROJECT')
# where O2ue6CX3 is the managed folder id
job = project.new_job('RECURSIVE_FORCED_BUILD').with_output('O2ue6CX3', object_type='MANAGED_FOLDER')
res = job.start_and_wait()
print(res.get_status())

Reference documentation

class dataikuapi.dss.project.JobDefinitionBuilder(project, job_type='NON_RECURSIVE_FORCED_BUILD')

Helper to run a job. Do not create this class directly, use DSSProject.new_job()

with_type(job_type)

Sets the build type

Parameters

job_type – the build type for the job RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD, RECURSIVE_FORCED_BUILD, RECURSIVE_MISSING_ONLY_BUILD

with_refresh_metastore(refresh_metastore)

Sets whether the hive tables built by the job should have their definitions refreshed after the corresponding dataset is built

Parameters

refresh_metastore (bool) –

with_output(name, object_type=None, object_project_key=None, partition=None)

Adds an item to build in this job

Parameters
  • name – name of the output object

  • object_type – type of object to build from: DATASET, MANAGED_FOLDER, SAVED_MODEL, STREAMING_ENDPOINT (defaults to None)

  • object_project_key – PROJECT_KEY for the project that contains the object to build (defaults to None)

  • partition – specify partition to build (defaults to None)

get_definition()

Gets the internal definition for this job

start()

Starts the job, and return a dataikuapi.dss.job.DSSJob handle to interact with it.

You need to wait for the returned job to complete

Returns

a job handle

Return type

dataikuapi.dss.job.DSSJob

start_and_wait(no_fail=False)

Starts the job, waits for it to complete and returns a dataikuapi.dss.job.DSSJob handle to interact with it

Raises if the job failed.

Parameters

no_fail – if True, does not raise if the job failed (defaults to False).

Returns

A job handle

Return type

dataikuapi.dss.job.DSSJob

class dataikuapi.dss.job.DSSJob(client, project_key, id)

A job on the DSS instance

abort()

Aborts the job

get_status()

Get the current status of the job

Returns:

the state of the job, as a JSON object

get_log(activity=None)

Get the logs of the job

Args:

activity: (optional) the name of the activity in the job whose log is requested

Returns:

the log, as a string

class dataikuapi.dss.job.DSSJobWaiter(job)

Helper to wait for a job’s completion

wait(no_fail=False)