Skip to main content

Python Sink

Overview

Python External Stream lets you read from and write to arbitrary sources by embedding a Python body directly in the DDL. It is available in Timeplus Enterprise 3.2.2+.

Unlike the Kafka, Pulsar, and NATS JetStream external streams — which speak a specific wire protocol — a Python External Stream is a generic escape hatch: you bring the protocol, the client library, and the logic. Timeplus calls your functions inside the embedded CPython runtime. When reading, return values become row batches; when writing, the sink function receives column batches. The same DDL object can serve as both a source (via read_function_name) and a sink (via write_function_name).

Create a Python External Stream

CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name (<col_name1> <col_type>)
AS $$
def read_fn():
...

def write_fn(col1, ...):
...

def init_fn(config): # optional
...

def deinit_fn(): # optional
...
$$
SETTINGS
type = 'python', -- required
read_function_name = '..', -- defaults to the stream name
write_function_name = '..', -- defaults to read_function_name
init_function_name = '..',
init_function_parameters = '..', -- requires init_function_name
deinit_function_name = '..',
mode = 'auto' -- 'auto' (default), 'streaming', or 'batch'

Settings

  • type: must be 'python'. Required.
  • read_function_name: name of the Python function used when the stream is read from. Defaults to the stream name.
  • write_function_name: name of the Python function used when the stream is written to (sink). Defaults to read_function_name.
  • init_function_name: name of a Python function called once before read/write processing begins. Use it to open connections, warm caches, or prepare state for the entry function to consume.
  • init_function_parameters: a single string passed as the only argument to the init function. Any format works (JSON, key=value, or a plain string) — parsing is up to your Python code. Requires init_function_name; otherwise the stream fails to create with Setting 'init_function_parameters' requires 'init_function_name' to be configured.
  • deinit_function_name: name of a Python function called once after read/write processing completes, for cleanup.
  • mode: Python execution mode — 'auto' (default), 'streaming', or 'batch'. See Modes.

Modes

The mode setting controls how Timeplus interprets your read function's return value:

  • auto (default) — Timeplus inspects the return value at runtime. A generator drives a streaming read; a list is consumed as a single batch.
  • streaming — the read function must return a generator. Timeplus pulls rows as they are yielded and the query stays alive until the generator stops. Returning a list from a streaming-mode function fails the query.
  • batch — the read function must return a list. Timeplus consumes the list once and the query finishes. Returning a generator from a batch-mode function fails the query.

Set mode explicitly when you want the engine to enforce the expected shape. Leave it as auto when you want flexibility.

Lifecycle

Each query that reads from or writes to a Python External Stream creates its own Python module. The lifecycle for one query is:

  1. The DDL body is compiled into a fresh module.
  2. Local API credential globals are injected into the module (see Local API credentials).
  3. If init_function_name is set, the init function is called once. When init_function_parameters is non-empty, it is passed as the only argument; otherwise init receives no arguments.
  4. The read or write entry function is called as data flows.
  5. When the query ends — normally or via cancellation — deinit_function_name, if set, is called.

Each query gets its own module, so ordinary module globals created by the DDL body are not reused across queries. If you stash state on Python's builtins module, use a stream-specific attribute name and remove it in deinit; builtins is shared by the embedded interpreter, so leftover attributes can be visible to later Python sessions in the same server process. Treat clients or caches opened in init as per-query resources and close them in deinit.

If the init function raises, the query fails before any read or write happens, and deinit_function_name is not called. If init opens more than one external resource, clean up already-opened resources before re-raising.

Local API credentials

When the local API user is enabled on the server, Timeplus injects two module-level globals into every Python External Stream module so your code can authenticate back to the same timeplusd over the native TCP protocol or the REST HTTP interface without hard-coding credentials:

  • __timeplus_local_api_user — the ephemeral local API username.
  • __timeplus_local_api_password — the matching token. Treat this as a secret; do not log it.

Both globals are available as bare names inside the Python body — no os.environ lookup needed. They are regenerated on every server restart and never written to disk.

Write Data to a Python External Stream

The write function is invoked once per chunk, not once per row. Its arguments are column-oriented: one Python list per output column, in declared order, all of equal length. Iterate with zip to recover row tuples.

Column values follow the same Python type mapping as Python UDF. One detail worth highlighting for sinks: a string (or fixed_string) column arrives as Python bytes, not str. Decode with .decode() (UTF-8) before passing values into APIs that require text.

Sink basics

CREATE EXTERNAL STREAM py_metric_sink (host string, value float32)
AS $$
def py_metric_sink(host, value):
for h, v in zip(host, value):
print(f"{h.decode()}={v}")
$$
SETTINGS type = 'python';

Insert a few rows:

INSERT INTO py_metric_sink (host, value) VALUES ('a', 1.0), ('b', 2.0);

Behind the scenes Timeplus calls py_metric_sink([b'a', b'b'], [1.0, 2.0]) — one call carrying both rows, with the string column delivered as bytes. A larger INSERT or a downstream query that delivers many chunks results in one call per chunk.

If write_function_name is omitted Timeplus uses read_function_name (which itself defaults to the stream name), so the Python function above only needs to be named once.

Materialized view → external stream

Routing a continuous query into a sink is the most common production pattern. Define the sink once, then point a materialized view at it:

CREATE EXTERNAL STREAM py_alert_sink (host string, value float32)
AS $$
def py_alert_sink(host, value):
for h, v in zip(host, value):
notify(h.decode(), v) # your notifier
$$
SETTINGS type = 'python';

CREATE MATERIALIZED VIEW high_value_alerts INTO py_alert_sink AS
SELECT host, value FROM metrics WHERE value > 100;

The materialized view feeds chunks into the sink as they are produced; each chunk becomes one call to py_alert_sink.

Custom protocol example: webhook POST

Load the destination URL in init, reuse that configuration for every chunk, and clear it in deinit. Init parameters carry the URL so the Python body is reusable across environments. To pool an actual HTTP connection, swap urllib for a session-aware client (for example requests.Session()) and stash the session itself on builtins.

CREATE EXTERNAL STREAM py_webhook (event_id string, body string)
AS $$
import builtins, json, urllib.request

def open_client(config):
builtins._tp_webhook = json.loads(config)["url"]

def close_client():
if hasattr(builtins, "_tp_webhook"):
del builtins._tp_webhook

def post_event(event_id, body):
for eid, b in zip(event_id, body):
payload = {"id": eid.decode(), "body": b.decode()}
req = urllib.request.Request(
builtins._tp_webhook,
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req).read()
$$
SETTINGS
type = 'python',
init_function_name = 'open_client',
init_function_parameters = '{"url":"https://hooks.example.com/notify"}',
deinit_function_name = 'close_client',
write_function_name = 'post_event';

Replace urllib with any HTTP, S3, queue, or proprietary client your environment ships with. Manage Python dependencies through the Python UDF library configuration — the same runtime backs both features.

Failure behavior

If the write function raises, the INSERT fails and the Python traceback is included in the error response. Side effects already performed by your Python code (HTTP requests sent, files written, queue messages published) are not rolled back by Timeplus — design idempotent writes, or batch your side effect inside a single transactional call your downstream system controls.