Continuous Python¶
Just like a regular Python recipe, a continuous Python recipe runs user-provided Python code. The difference is that it accepts streaming endpoints as inputs and outputs, and when running, will restart if needed and requested. The Python code has to loop or wait indefinitely, in order to continuously handle the input and produce output.
Reading from streaming endpoints¶
It is advised to read from streaming endpoints using native access, ie to have the Python process connect directly to the message source and handle the messages and offsets directly.
Alternatively, a simpler method to read from a streaming endpoint is to have DSS read the messages and forward them to the Python process.
endpoint = dataiku.StreamingEndpoint("wikipedia")
message_iterator = endpoint.get_message_iterator()
for msg in message_iterator:
# use the message ... (msg is a json-encoded object)
state = message_iterator.get_state() # a string
# if/when needed, do something with state
Reading from Kafka endpoints¶
The StreamingEndpoint class offers a helper to consume from a Kafka topic using the pykafka package
endpoint = dataiku.StreamingEndpoint("wikipedia_kafka")
message_iterator = endpoint.get_native_kafka_consumer()
for msg in message_iterator:
# use the pykafka message object
Messages returned by pykafka have the following fields usable in your code:
timestamp is a unix timestamp in milliseconds
offset is the message offset in the topic
partition_key is the key of the message, as a byte array
value is the value of the message, as a byte array
Note
The builtin Python environment does not include the pykafka package. To use the helper methods, you need to use a custom code-env that includes this package.
Note
Only PLAINTEXT Kafka listeners (ie without SSL encryption) are handled by the helper. For SSL support, you need to use pykafka directly and pass the relevant parameters to setup the encryption logic.
Note
The helper returns simple consumers. If you want balanced consumers, you need to use pykafka directly.
Reading from SQS endpoints¶
The StreamingEndpoint class offers a helper to consume from a SQS queue using the boto3 package
endpoint = dataiku.StreamingEndpoint("wikipedia_sqs")
message_iterator = endpoint.get_native_sqs_consumer()
for msg in message_iterator:
# use the message (it's a string)
The messages returned by the iterator are acknowledged one by one on SQS side when they are retrieved from the iterator. To acknowledge messages only after they’ve been processed, use boto3 directly
Note
The builtin Python environment does not include the boto3 package. To use the helper methods, you need to use a custom code-env including this package.
Reading from HTTP SSE endpoints¶
The StreamingEndpoint class offers a helper to consume from a HTTP SSE endpoint using the sseclient package
endpoint = dataiku.StreamingEndpoint("wikipedia")
message_iterator = endpoint.get_native_httpsse_consumer()
for msg in message_iterator:
# use the sseclient message object
Messages returned by pykafka have these fields:
id is a message identifier (often equivalent to the offset)
event is the event type
data is the message data (can be None depending on the event)
Note
The builtin Python environment does not include the sseclient package. To use the helper methods, you need to use a custom code-env including this package.
Writing to streaming endpoints¶
It is advised to write to streaming endpoints using native access, ie to have the Python process connect directly to the message source and handle the messages and offsets directly.
Alternatively, a simpler method to write to a streaming endpoint is to have the Python process send the messages to DSS and let DSS handle the writing.
endpoint = dataiku.StreamingEndpoint("wikipedia_kafka")
# setting a schema is strongly advised before using get_writer()
endpoint.set_schema([{"name":"data", "type":"string", ...}])
with endpoint.get_writer() as writer:
for msg in message_iterator:
writer.write_row_dict({"data":msg.data, ...})
writer.flush()
The call to flush() ensures the messages are sent to DSS for writing. It is not mandatory after each and every message written, but need to be used regularly nonetheless.
Writing to Kafka endpoints¶
The StreamingEndpoint class offers a helper to produce to a Kafka topic using the pykafka package
endpoint = dataiku.StreamingEndpoint("wikipedia_kafka")
with endpoint.get_native_kafka_producer(sync=True) as writer:
for msg in message_iterator:
writer.produce(msg.data.encode('utf8'), partition_key='my_key'.encode('utf8'), timestamp=datetime.now())
The partition_key and timestamp params are optional.
Note
The builtin Python environment doesn’t include the pykafka package. To use the helper methods, the code needs to be run on a code env providing the pykafka package.
Note
Only PLAINTEXT Kafka listeners (ie without SSL encryption) are handled by the helper. For SSL support, you need to use pykafka directly and pass the relevant parameters to setup the encryption logic.
Writing to datasets¶
Writing the output dataset is done via a writer object returned by Dataset.get_continuous_writer, using the standard methods write_row_dict, write_dataframe or write_tuple.
Warning
When writing to datasets, it is crucial to regularly checkpoint the data. The reason being that rows written to the dataset are first staged to a temporary file and only become fully part of the dataset when a checkpoint is done.
dataset = dataiku.Dataset("wikipedia_dataset")
dataset.write_schema([{"name":"data", "type":"string"}, ...])
with dataset.get_continuous_writer() as writer:
for msg in message_iterator:
writer.write_row_dict({"data":msg.data, ...})
writer.checkpoint("this_recipe", "some state")
Offset management¶
Most streaming sources have a notion of offset, to keep track of where in the message queue the reader is. The recipe is responsible for managing its offsets, and particularly for storing the current offset and retrieving the last offset upon starting.
When writing to datasets with a continuous writer (from a get_continuous_writer() call), and if the dataset is a file-based dataset, the recipe can rely on the last state saved via a call to checkpoint() and retrieve that last state with get_state() on the continuous writer. When writing to streaming endpoints, the recipe has to manage the storage of the offsets.