Streaming Endpoints¶
Note
There are two main classes related to streaming endpoint handling in Dataiku’s Python APIs:
dataiku.StreamingEndpoint
in the dataiku package. It was initially designed for usage within DSS in recipes and Jupyter notebooks.dataikuapi.dss.streaming_endpoint.DSSStreamingEndpoint
in the dataikuapi package. It was initially designed for usage outside of DSS.
Both classes have fairly similar capabilities, but we recommend using dataiku.StreamingEndpoint within DSS.
For more details on the two packages, please see Python APIs
dataiku version¶
This class lets you interact with streaming endpoints in Python recipes and notebooks.
-
class
dataiku.
StreamingEndpoint
(id, project_key=None)¶ This is a handle to obtain readers and writers on a dataiku streaming endpoint.
-
get_location_info
(sensitive_info=False)¶
-
get_schema
(raise_if_empty=True)¶ Gets the schema of this streaming endpoint, as an array of objects like this one: { ‘type’: ‘string’, ‘name’: ‘foo’, ‘maxLength’: 1000 }. There is more information for the map, array and object types.
-
set_schema
(columns)¶ Sets the schema of this streaming endpoint
-
get_writer
()¶ Get a stream writer to append to this streaming endpoint as a sink. The writer must be closed as soon as you don’t need it.
The schema of the streaming endpoint MUST be set before using this. If you don’t set the schema of the streaming endpoint, your data will generally not be stored by the output writers
-
get_message_iterator
(previous_state=None, columns=[])¶ Returns a python iterator which:
yields rows as dicts
has a get_state() method to retrieve the consumer state
-
get_native_kafka_topic
(broker_version='1.0.0')¶ Get a pykafka topic for the Kafka topic of this streaming endpoint
-
get_native_kafka_consumer
(broker_version='1.0.0', **kwargs)¶ Get a pykafka consumer for the Kafka topic of this streaming endpoint
-
get_native_kafka_producer
(broker_version='1.0.0', **kwargs)¶ Get a pykafka producer for the Kafka topic of this streaming endpoint
-
get_native_httpsse_consumer
()¶ Get a sseclient for the HTTP SSE url of this streaming endpoint
-
get_native_sqs_consumer
()¶ Get a boto client for the SQS queue of this streaming endpoint
-
-
class
dataiku.core.streaming_endpoint.
StreamingEndpointStream
(streaming_endpoint, previous_state, columns)¶ -
next
()¶
-
get_state
()¶
-
-
class
dataiku.core.continuous_write.
ContinuousWriterBase
¶ Handle to write using the continuous write API to a dataset or strealming endpoint. Use Dataset.get_continuous_writer() to obtain a DatasetWriter.
-
write_tuple
(row)¶ Write a single row from a tuple or list of column values. Columns must be given in the order of the dataset schema.
Note: The schema of the dataset MUST be set before using this.
Encoding note: strings MUST be given as Unicode object. Giving str objects will fail.
-
write_row_array
(row)¶
-
write_row_dict
(row_dict)¶ Write a single row from a dict of column name -> column value.
Some columns can be omitted, empty values will be inserted instead.
Note: The schema of the dataset MUST be set before using this.
Encoding note: strings MUST be given as Unicode object. Giving str objects will fail.
-
write_dataframe
(df)¶ Appends a Pandas dataframe to the dataset being written.
This method can be called multiple times (especially when you have been using iter_dataframes to read from an input dataset)
Encoding node: strings MUST be in the dataframe as UTF-8 encoded str objects. Using unicode objects will fail.
-
flush
()¶
-
checkpoint
(state)¶
-
get_state
()¶
-
close
(failed)¶ Closes this dataset writer
-
dataikuapi version¶
Use this class preferably outside of DSS
-
class
dataikuapi.dss.streaming_endpoint.
DSSStreamingEndpointListItem
(client, data)¶ An item in a list of streaming endpoints. Do not instantiate this class
-
to_streaming_endpoint
()¶ Gets the
DSSStreamingEndpoint
corresponding to this streaming endpoint
-
property
name
¶
-
property
id
¶
-
property
type
¶
-
property
schema
¶
-
property
connection
¶ Returns the connection on which this streaming endpoint is attached, or None if there is no connection for this streaming endpoint
-
get_column
(column)¶ Returns the schema column given a name. :param str column: Column to find :return a dict of the column settings or None if column does not exist
-
-
class
dataikuapi.dss.streaming_endpoint.
DSSStreamingEndpoint
(client, project_key, streaming_endpoint_name)¶ A streaming endpoint on the DSS instance
-
delete
()¶ Delete the streaming endpoint
-
get_settings
()¶ Returns the settings of this streaming endpoint as a
DSSStreamingEndpointSettings
, or one of its subclasses.Know subclasses of
DSSStreamingEndpointSettings
includeKafkaStreamingEndpointSettings
andHTTPSSEStreamingEndpointSettings
You must use
save()
on the returned object to make your changes effective on the streaming endpoint.# Example: changing the topic on a kafka streaming endpoint streaming_endpoint = project.get_streaming_endpoint("my_endpoint") settings = streaming_endpoint.get_settings() settings.set_topic("country") settings.save()
- Return type
-
exists
()¶ Returns whether this streaming endpoint exists
-
get_schema
()¶ Get the schema of the streaming endpoint
- Returns:
a JSON object of the schema, with the list of columns
-
set_schema
(schema)¶ Set the schema of the streaming endpoint
- Args:
schema: the desired schema for the streaming endpoint, as a JSON object. All columns have to provide their name and type
-
get_zone
()¶ Gets the flow zone of this streaming endpoint
- Return type
-
move_to_zone
(zone)¶ Moves this object to a flow zone
- Parameters
zone (object) – a
dataikuapi.dss.flow.DSSFlowZone
where to move the object
Share this object to a flow zone
- Parameters
zone (object) – a
dataikuapi.dss.flow.DSSFlowZone
where to share the object
Unshare this object from a flow zone
- Parameters
zone (object) – a
dataikuapi.dss.flow.DSSFlowZone
from where to unshare the object
-
get_usages
()¶ Get the recipes or analyses referencing this streaming endpoint
- Returns:
a list of usages
-
get_object_discussions
()¶ Get a handle to manage discussions on the streaming endpoint
- Returns
the handle to manage discussions
- Return type
dataikuapi.discussion.DSSObjectDiscussions
-
test_and_detect
(infer_storage_types=False, limit=10, timeout=60)¶
-
autodetect_settings
(infer_storage_types=False, limit=10, timeout=60)¶
-
get_as_core_streaming_endpoint
()¶
-
new_code_recipe
(type, code=None, recipe_name=None)¶ Starts creation of a new code recipe taking this streaming endpoint as input :param str type: Type of the recipe (‘cpython’, ‘streaming_spark_scala’, …) :param str code: The code of the recipe
-
new_recipe
(type, recipe_name=None)¶ Starts creation of a new recipe taking this streaming endpoint as input. For more details, please see
dataikuapi.dss.project.DSSProject.new_recipe()
- Parameters
type (str) – Type of the recipe
-
-
class
dataikuapi.dss.streaming_endpoint.
DSSStreamingEndpointSettings
(streaming_endpoint, settings)¶ -
get_raw
()¶ Get the raw streaming endpoint settings as a dict
-
get_raw_params
()¶ Get the type-specific params, as a raw dict
-
property
type
¶
-
add_raw_schema_column
(column)¶
-
save
()¶
-
-
class
dataikuapi.dss.streaming_endpoint.
KafkaStreamingEndpointSettings
(streaming_endpoint, settings)¶ -
set_connection_and_topic
(connection, topic)¶
-
-
class
dataikuapi.dss.streaming_endpoint.
HTTPSSEStreamingEndpointSettings
(streaming_endpoint, settings)¶ -
set_url
(url)¶
-
-
class
dataikuapi.dss.streaming_endpoint.
DSSManagedStreamingEndpointCreationHelper
(project, streaming_endpoint_name, streaming_endpoint_type)¶ -
get_creation_settings
()¶
-
with_store_into
(connection, format_option_id=None)¶ Sets the connection into which to store the new streaming endpoint :param str connection: Name of the connection to store into :param str format_option_id: Optional identifier of a serialization format option :return: self
-
create
(overwrite=False)¶ Executes the creation of the streaming endpoint according to the selected options :param overwrite: If the streaming endpoint being created already exists, delete it first (removing data) :return: The
DSSStreamingEndpoint
corresponding to the newly created streaming endpoint
-
already_exists
()¶ Returns whether this streaming endpoint already exists
-
-
class
dataikuapi.dss.continuousactivity.
DSSContinuousActivity
(client, project_key, recipe_id)¶ A continuous activity on the DSS instance
-
start
(loop_params={})¶ Start the activity
-
stop
()¶ Stop the activity
-
get_status
()¶ Get the current status of the continuous activity
- Returns:
the state of the continuous activity, as a JSON object
-
get_recipe
()¶ Return a handle on the associated recipe
-