Python Source
Overview
Python External Stream lets you read from and write to arbitrary sources by embedding a Python body directly in the DDL. It is available in Timeplus Enterprise 3.2.2+.
Unlike the Kafka, Pulsar, and NATS JetStream external streams — which speak a specific wire protocol — a Python External Stream is a generic escape hatch: you bring the protocol, the client library, and the logic. Timeplus calls your functions inside the embedded CPython runtime. When reading, return values become row batches; when writing, the sink function receives column batches. The same DDL object can serve as both a source (via read_function_name) and a sink (via write_function_name).
Create a Python External Stream
CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name (<col_name1> <col_type>)
AS $$
def read_fn():
...
def write_fn(col1, ...):
...
def init_fn(config): # optional
...
def deinit_fn(): # optional
...
$$
SETTINGS
type = 'python', -- required
read_function_name = '..', -- defaults to the stream name
write_function_name = '..', -- defaults to read_function_name
init_function_name = '..',
init_function_parameters = '..', -- requires init_function_name
deinit_function_name = '..',
mode = 'auto' -- 'auto' (default), 'streaming', or 'batch'
Settings
- type: must be
'python'. Required. - read_function_name: name of the Python function used when the stream is read from. Defaults to the stream name.
- write_function_name: name of the Python function used when the stream is written to (sink). Defaults to
read_function_name. - init_function_name: name of a Python function called once before read/write processing begins. Use it to open connections, warm caches, or prepare state for the entry function to consume.
- init_function_parameters: a single string passed as the only argument to the init function. Any format works (JSON,
key=value, or a plain string) — parsing is up to your Python code. Requiresinit_function_name; otherwise the stream fails to create withSetting 'init_function_parameters' requires 'init_function_name' to be configured. - deinit_function_name: name of a Python function called once after read/write processing completes, for cleanup.
- mode: Python execution mode —
'auto'(default),'streaming', or'batch'. See Modes.
Modes
The mode setting controls how Timeplus interprets your read function's return value:
auto(default) — Timeplus inspects the return value at runtime. A generator drives a streaming read; a list is consumed as a single batch.streaming— the read function must return a generator. Timeplus pulls rows as they are yielded and the query stays alive until the generator stops. Returning a list from a streaming-mode function fails the query.batch— the read function must return a list. Timeplus consumes the list once and the query finishes. Returning a generator from a batch-mode function fails the query.
Set mode explicitly when you want the engine to enforce the expected shape. Leave it as auto when you want flexibility.
Lifecycle
Each query that reads from or writes to a Python External Stream creates its own Python module. The lifecycle for one query is:
- The DDL body is compiled into a fresh module.
- Local API credential globals are injected into the module (see Local API credentials).
- If
init_function_nameis set, the init function is called once. Wheninit_function_parametersis non-empty, it is passed as the only argument; otherwise init receives no arguments. - The read or write entry function is called as data flows.
- When the query ends — normally or via cancellation —
deinit_function_name, if set, is called.
Each query gets its own module, so ordinary module globals created by the DDL body are not reused across queries. If you stash state on Python's builtins module, use a stream-specific attribute name and remove it in deinit; builtins is shared by the embedded interpreter, so leftover attributes can be visible to later Python sessions in the same server process. Treat clients or caches opened in init as per-query resources and close them in deinit.
If the init function raises, the query fails before any read or write happens, and deinit_function_name is not called. If init opens more than one external resource, clean up already-opened resources before re-raising.
Local API credentials
When the local API user is enabled on the server, Timeplus injects two module-level globals into every Python External Stream module so your code can authenticate back to the same timeplusd over the native TCP protocol or the REST HTTP interface without hard-coding credentials:
__timeplus_local_api_user— the ephemeral local API username.__timeplus_local_api_password— the matching token. Treat this as a secret; do not log it.
Both globals are available as bare names inside the Python body — no os.environ lookup needed. They are regenerated on every server restart and never written to disk.
Read Data from a Python External Stream
The read function is the entry point Timeplus calls to pull rows from your Python code. It is synchronous (no async/await) and receives no implicit arguments — any configuration must arrive through the init function. Each value it produces is a row whose columns match the stream's schema in declared order; for a single-column stream you may yield bare scalars.
Streaming source (generator)
Yield a row or a batch of rows at a time. The query stays alive as long as the generator does, which makes generators the right shape for clocks, polling loops, websocket feeds, message-bus consumers, and other long-lived sources.
CREATE EXTERNAL STREAM py_clock (tick uint32, label string)
AS $$
import time
def py_clock():
n = 0
while True:
yield (n, "tick")
n += 1
time.sleep(1)
$$
SETTINGS
type = 'python',
mode = 'streaming';
read_function_name is omitted, so it defaults to the stream name py_clock. Setting mode = 'streaming' makes the engine reject a non-generator return value, which catches mistakes like returning a list early.
Batch source (list)
Return a list of rows once. Use this shape for one-shot pulls — REST snapshots, file scans, or any source where a single call yields the full result.
CREATE EXTERNAL STREAM py_users (id int32, name string)
AS $$
import json
import urllib.request
def py_users():
with urllib.request.urlopen("https://api.example.com/users") as r:
payload = json.load(r)
return [(u["id"], u["name"]) for u in payload]
$$
SETTINGS
type = 'python',
mode = 'batch';
Long-lived setup with init / deinit
Open a client once, stash it on builtins, and tear it down at the end of the query. Init parameters arrive as a single string, so JSON is convenient when you have more than one value to pass.
CREATE EXTERNAL STREAM py_cookie_counter
(
previous_cleanup_count int32,
secret_flavor string
)
AS $$
import builtins, json
def open_bakery(config):
builtins._tp_cookie_secret_flavor = json.loads(config)["flavor"]
def close_bakery():
if hasattr(builtins, "_tp_cookie_secret_flavor"):
del builtins._tp_cookie_secret_flavor
def serve_cookie_report():
return [(0, getattr(builtins, "_tp_cookie_secret_flavor", ""))]
$$
SETTINGS
type = 'python',
read_function_name = 'serve_cookie_report',
init_function_name = 'open_bakery',
init_function_parameters = '{"flavor":"double-chocolate"}',
deinit_function_name = 'close_bakery';
Remember that init and deinit run per query, not once per stream creation — the builtins state above is set up and torn down each time a query reads from py_cookie_counter. Use stream-specific builtins attribute names and delete them in deinit so later Python sessions do not see stale state.
Calling back to timeplusd
The injected __timeplus_local_api_user and __timeplus_local_api_password globals let the read function authenticate to the same server without hard-coded credentials. The example below queries an internal stream over the REST interface and turns the result into a row.
CREATE EXTERNAL STREAM py_user_count (total int64)
AS $$
import base64, urllib.request
def py_user_count():
creds = base64.b64encode(
f"{__timeplus_local_api_user}:{__timeplus_local_api_password}".encode()
).decode()
req = urllib.request.Request(
"http://localhost:8123/?query=SELECT+count()+FROM+table(users)",
headers={"Authorization": f"Basic {creds}"},
)
with urllib.request.urlopen(req) as r:
return [(int(r.read().strip()),)]
$$
SETTINGS
type = 'python',
mode = 'batch';
Treat __timeplus_local_api_password as a secret — do not log it, do not echo it back into output rows, and do not pass it into subprocesses.
Cancellation and errors
When a query is cancelled (for example by KILL QUERY or by closing the client), the running Python code receives a KeyboardInterrupt. Streaming generators stop at the next yield point; long-blocking calls inside C extensions may delay the interrupt until they return.
If the read function raises, the query fails and the Python traceback is included in the error response — wrap recoverable errors inside the function and decide explicitly whether to re-raise or continue.