Streaming Endpoints

Note

There are two main classes related to streaming endpoint handling in Dataiku’s Python APIs:

Both classes have fairly similar capabilities, but we recommend using dataiku.StreamingEndpoint within DSS.

For more details on the two packages, please see Python APIs

dataiku version

This class lets you interact with streaming endpoints in Python recipes and notebooks.

class dataiku.StreamingEndpoint(id, project_key=None)

This is a handle to obtain readers and writers on a dataiku streaming endpoint.

get_location_info(sensitive_info=False)
get_schema(raise_if_empty=True)

Gets the schema of this streaming endpoint, as an array of objects like this one: { ‘type’: ‘string’, ‘name’: ‘foo’, ‘maxLength’: 1000 }. There is more information for the map, array and object types.

set_schema(columns)

Sets the schema of this streaming endpoint

get_writer()

Get a stream writer to append to this streaming endpoint as a sink. The writer must be closed as soon as you don’t need it.

The schema of the streaming endpoint MUST be set before using this. If you don’t set the schema of the streaming endpoint, your data will generally not be stored by the output writers

get_message_iterator(previous_state=None, columns=[])

Returns a python iterator which:

  • yields rows as dicts

  • has a get_state() method to retrieve the consumer state

get_native_kafka_topic(broker_version='1.0.0')

Get a pykafka topic for the Kafka topic of this streaming endpoint

get_native_kafka_consumer(broker_version='1.0.0', **kwargs)

Get a pykafka consumer for the Kafka topic of this streaming endpoint

get_native_kafka_producer(broker_version='1.0.0', **kwargs)

Get a pykafka producer for the Kafka topic of this streaming endpoint

get_native_httpsse_consumer()

Get a sseclient for the HTTP SSE url of this streaming endpoint

get_native_sqs_consumer()

Get a boto client for the SQS queue of this streaming endpoint

class dataiku.core.streaming_endpoint.StreamingEndpointStream(streaming_endpoint, previous_state, columns)
next()
get_state()
class dataiku.core.continuous_write.ContinuousWriterBase

Handle to write using the continuous write API to a dataset or strealming endpoint. Use Dataset.get_continuous_writer() to obtain a DatasetWriter.

write_tuple(row)

Write a single row from a tuple or list of column values. Columns must be given in the order of the dataset schema.

Note: The schema of the dataset MUST be set before using this.

Encoding note: strings MUST be given as Unicode object. Giving str objects will fail.

write_row_array(row)
write_row_dict(row_dict)

Write a single row from a dict of column name -> column value.

Some columns can be omitted, empty values will be inserted instead.

Note: The schema of the dataset MUST be set before using this.

Encoding note: strings MUST be given as Unicode object. Giving str objects will fail.

write_dataframe(df)

Appends a Pandas dataframe to the dataset being written.

This method can be called multiple times (especially when you have been using iter_dataframes to read from an input dataset)

Encoding node: strings MUST be in the dataframe as UTF-8 encoded str objects. Using unicode objects will fail.

flush()
checkpoint(state)
get_state()
close(failed)

Closes this dataset writer

class dataiku.core.continuous_write.DatasetContinuousWriter(dataset, source_id, split_id=0)
send_init_request()
get_schema()
class dataiku.core.continuous_write.StreamingEndpointContinuousWriter(streaming_endpoint)
send_init_request()
get_schema()

dataikuapi version

Use this class preferably outside of DSS

class dataikuapi.dss.streaming_endpoint.DSSStreamingEndpointListItem(client, data)

An item in a list of streaming endpoints. Do not instantiate this class

to_streaming_endpoint()

Gets the DSSStreamingEndpoint corresponding to this streaming endpoint

property name
property id
property type
property schema
property connection

Returns the connection on which this streaming endpoint is attached, or None if there is no connection for this streaming endpoint

get_column(column)

Returns the schema column given a name. :param str column: Column to find :return a dict of the column settings or None if column does not exist

class dataikuapi.dss.streaming_endpoint.DSSStreamingEndpoint(client, project_key, streaming_endpoint_name)

A streaming endpoint on the DSS instance

delete()

Delete the streaming endpoint

get_settings()

Returns the settings of this streaming endpoint as a DSSStreamingEndpointSettings, or one of its subclasses.

Know subclasses of DSSStreamingEndpointSettings include KafkaStreamingEndpointSettings and HTTPSSEStreamingEndpointSettings

You must use save() on the returned object to make your changes effective on the streaming endpoint.

# Example: changing the topic on a kafka streaming endpoint
streaming_endpoint = project.get_streaming_endpoint("my_endpoint")
settings = streaming_endpoint.get_settings()
settings.set_topic("country")
settings.save()
Return type

DSSStreamingEndpointSettings

exists()

Returns whether this streaming endpoint exists

get_schema()

Get the schema of the streaming endpoint

Returns:

a JSON object of the schema, with the list of columns

set_schema(schema)

Set the schema of the streaming endpoint

Args:

schema: the desired schema for the streaming endpoint, as a JSON object. All columns have to provide their name and type

get_zone()

Gets the flow zone of this streaming endpoint

Return type

dataikuapi.dss.flow.DSSFlowZone

move_to_zone(zone)

Moves this object to a flow zone

Parameters

zone (object) – a dataikuapi.dss.flow.DSSFlowZone where to move the object

share_to_zone(zone)

Share this object to a flow zone

Parameters

zone (object) – a dataikuapi.dss.flow.DSSFlowZone where to share the object

unshare_from_zone(zone)

Unshare this object from a flow zone

Parameters

zone (object) – a dataikuapi.dss.flow.DSSFlowZone from where to unshare the object

get_usages()

Get the recipes or analyses referencing this streaming endpoint

Returns:

a list of usages

get_object_discussions()

Get a handle to manage discussions on the streaming endpoint

Returns

the handle to manage discussions

Return type

dataikuapi.discussion.DSSObjectDiscussions

test_and_detect(infer_storage_types=False, limit=10, timeout=60)
autodetect_settings(infer_storage_types=False, limit=10, timeout=60)
get_as_core_streaming_endpoint()
new_code_recipe(type, code=None, recipe_name=None)

Starts creation of a new code recipe taking this streaming endpoint as input :param str type: Type of the recipe (‘cpython’, ‘streaming_spark_scala’, …) :param str code: The code of the recipe

new_recipe(type, recipe_name=None)

Starts creation of a new recipe taking this streaming endpoint as input. For more details, please see dataikuapi.dss.project.DSSProject.new_recipe()

Parameters

type (str) – Type of the recipe

class dataikuapi.dss.streaming_endpoint.DSSStreamingEndpointSettings(streaming_endpoint, settings)
get_raw()

Get the raw streaming endpoint settings as a dict

get_raw_params()

Get the type-specific params, as a raw dict

property type
add_raw_schema_column(column)
save()
class dataikuapi.dss.streaming_endpoint.KafkaStreamingEndpointSettings(streaming_endpoint, settings)
set_connection_and_topic(connection, topic)
class dataikuapi.dss.streaming_endpoint.HTTPSSEStreamingEndpointSettings(streaming_endpoint, settings)
set_url(url)
class dataikuapi.dss.streaming_endpoint.DSSManagedStreamingEndpointCreationHelper(project, streaming_endpoint_name, streaming_endpoint_type)
get_creation_settings()
with_store_into(connection, format_option_id=None)

Sets the connection into which to store the new streaming endpoint :param str connection: Name of the connection to store into :param str format_option_id: Optional identifier of a serialization format option :return: self

create(overwrite=False)

Executes the creation of the streaming endpoint according to the selected options :param overwrite: If the streaming endpoint being created already exists, delete it first (removing data) :return: The DSSStreamingEndpoint corresponding to the newly created streaming endpoint

already_exists()

Returns whether this streaming endpoint already exists

class dataikuapi.dss.continuousactivity.DSSContinuousActivity(client, project_key, recipe_id)

A continuous activity on the DSS instance

start(loop_params={})

Start the activity

stop()

Stop the activity

get_status()

Get the current status of the continuous activity

Returns:

the state of the continuous activity, as a JSON object

get_recipe()

Return a handle on the associated recipe