NATS JetStream Source
Overview
NATS is a high-performance, lightweight messaging system. NATS JetStream is the built-in streaming layer for NATS that provides durable, replayable message streams with advanced features like message acknowledgment, persistence, and consumer management.
Timeplus provides first-class integration for NATS JetStream as a new type of External Stream. You can read or write data in NATS JetStream using SQL queries, similar to how you work with Kafka or Pulsar external streams.
Create NATS JetStream External Stream
Use the following SQL command to create a NATS JetStream external stream:
CREATE EXTERNAL STREAM [IF NOT EXISTS] <stream_name>
(<col_name1> <col_type>)
SETTINGS
type='nats_jetstream', -- required
url='nats://host:port', -- required
stream_name='..', -- required
subject='..', -- required
consumer_stall_timeout_ms=..,
username='..',
password='..',
token='..',
secure=<true|false>,
ssl_ca_cert_file='..',
skip_ssl_cert_check=<true|false>,
ssl_cert_file='..',
ssl_key_file='..',
data_format='..',
format_schema='..',
one_message_per_row=..,
config_file='..',
named_collection='..';
Settings
type
Must be set to nats_jetstream.
url
The NATS server URL.
Example: nats://localhost:4222
stream_name
The name of the JetStream stream to connect to. The stream must exist on the NATS server before creating the external stream. Timeplus validates the stream exists during creation.
subject
The NATS subject to subscribe or publish messages. Wild cards * and > are supported.
For inserts, messages are published to this subject unless overwritten by the _nats_subject column.
consumer_stall_timeout_ms
Stall detection timeout in milliseconds. If no progress is made for this duration, Timeplus will recreate the subscription to recover from potential stalls.
Default: 60000
Authentication Settings
Timeplus supports multiple authentication mechanisms for NATS. Only one method can be used at a time.
username / password
Username and password authentication.
token
Token-based authentication.
TLS Settings
secure
Set 'true' to use a secure (SSL/TLS) connection.
ssl_ca_cert_file
Path to the CA certificate file for TLS verification.
skip_ssl_cert_check
Set 'true' to skip server certificate verification.
This is fine for tests but use with caution since this is not secure.
ssl_cert_file / ssl_key_file
For mTLS (mutual TLS), provide both the client certificate and private key files path. Both must be specified together.
The certificates must be in PEM format and must be sorted starting with the subject's certificate, followed by intermediate CA certificates if applicable, and ending at the highest level (root) CA.
The private key file format supported is also PEM.
Data Format Settings
data_format
Defines how NATS messages are parsed and written.
Common formats include:
| Format | Description |
|---|---|
RawBLOB | Raw text, no parsing |
JSONEachRow | One JSON document per line |
CSV | Comma-separated values |
TSV | Tab-separated values |
ProtobufSingle | One Protobuf message per NATS message |
Protobuf | Multiple Protobuf messages per NATS message |
Avro | Avro-encoded messages |
format_schema
Required for ProtobufSingle, Protobuf, and Avro formats. Defines the schema for message serialization.
one_message_per_row
Set to true to ensure each NATS message maps to exactly one JSON document, especially when writing with JSONEachRow.
When _tp_message_headers column is defined, one_message_per_row must be true and will be automatically set.
Other Settings
config_file
Path to a configuration file containing key-value pairs. Useful for managing credentials securely, especially in Kubernetes environments with secrets managed via HashiCorp Vault.
Example config file:
username=my_username
password=my_password
data_format=JSONEachRow
one_message_per_row=true
named_collection
Named Collections allow you to group shared configuration settings into a reusable object. This simplifies DDL and enhances security by masking sensitive information.
Example:
CREATE NAMED COLLECTION nats_nc AS
url='nats://localhost:4222',
username='admin_user',
password='admin';
CREATE EXTERNAL STREAM test_nats_es(raw string)
SETTINGS
type='nats_jetstream',
stream_name='my_stream',
subject='my.subject',
named_collection='nats_nc';
For more details, refer to Named Collection documentation.
Read Data from NATS JetStream
Timeplus allows reading NATS JetStream messages in multiple data formats, including:
- Plain string (raw)
- CSV / TSV
- JSON
- Protobuf
- Avro
Read NATS Messages as Raw String
Use this mode when:
- Messages contain unstructured text or binary data
- No built-in format is applicable
- You want to debug raw NATS messages
Raw String Example
CREATE EXTERNAL STREAM ext_application_logs (raw string)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='application_logs',
subject='app.logs.*'
You can use functions like regex string processing or JSON extract functions to further process the raw string.
Regex Example – Parse Application Logs
SELECT
to_time(extract(raw, '^(\\d{4}\\.\\d{2}\\.\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d+)')) AS timestamp,
extract(raw, '} <(\\w+)>') AS level,
extract(raw, '} <\\w+> (.*)') AS message
FROM ext_application_logs;
Read JSON NATS Messages
Assuming NATS messages contain JSON text with this schema:
{
"actor": string,
"created_at": timestamp,
"id": string,
"payload": string,
"repo": string,
"type": string
}
You can process JSON in two ways:
Option A: Parse with JSON Extract Functions
- Create a raw stream:
CREATE EXTERNAL STREAM ext_json_raw (raw string)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='github_events',
subject='github.events.>';
- Extract fields using JSON extract shortcut syntax or JSON extract functions:
SELECT
raw:actor AS actor,
raw:created_at::datetime64(3, 'UTC') AS created_at,
raw:id AS id,
raw:payload AS payload,
raw:repo AS repo,
raw:type AS type
FROM ext_json_raw;
This method is most flexible and works well for dynamic JSON with new or missing fields. It can also extract nested JSON fields.
Option B: Use JSONEachRow Format
Define a NATS JetStream external stream with columns mapped to the JSON fields and specify data_format='JSONEachRow':
CREATE EXTERNAL STREAM ext_json_parsed
(
actor string,
created_at datetime64(3, 'UTC'),
id string,
payload string,
repo string,
type string
)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='github_events',
subject='github.events',
data_format='JSONEachRow'
When you query the stream, JSON fields are parsed and cast to the target column types automatically.
This method is most convenient when the JSON schema is stable and works for top-level JSON fields.
Read CSV NATS Messages
Similar to JSONEachRow, you can read CSV formatted messages:
CREATE EXTERNAL STREAM ext_csv_parsed
(
actor string,
created_at datetime64(3, 'UTC'),
id string,
payload string,
repo string,
type string
)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='csv_stream',
subject='csv.data',
data_format='CSV';
Read TSV NATS Messages
Identical to CSV, but expects tab-separated values:
SETTINGS data_format='TSV';
Read Avro or Protobuf Messages
To read Avro-encoded or Protobuf-encoded NATS messages, please refer to Schema documentation.
Access NATS Message Metadata
Timeplus provides virtual columns for NATS JetStream message metadata.
| Virtual Column | Description | Type |
|---|---|---|
_tp_time | NATS message timestamp | datetime64(3, 'UTC') |
_tp_append_time | Message append time | datetime64(3, 'UTC') |
_tp_process_time | Processing time | datetime64(3, 'UTC') |
_tp_sn | Stream sequence number | int64 |
_tp_shard | Always 0 for NATS | int32 |
_tp_message_headers | NATS headers as key-value map | map(string, string) |
_nats_subject | NATS subject | string |
NATS Message Metadata Examples
-- View message time and payload
SELECT _tp_time, raw FROM ext_github_events;
-- View message subject
SELECT _nats_subject, raw FROM ext_github_events;
-- Access headers
SELECT _tp_message_headers['trace_id'], raw FROM ext_github_events;
-- View sequence number
SELECT _tp_sn, raw FROM ext_github_events;
Query Settings for NATS JetStream External Streams
Controlling Where to Start Reading
Use the seek_to query setting to control where to start consuming messages.
Start from Earliest (All Messages)
SELECT raw FROM ext_stream SETTINGS seek_to='earliest'
For non-streaming queries (using table() function), seek_to defaults to 'earliest'.
Start from Latest (New Messages Only)
SELECT raw FROM ext_stream SETTINGS seek_to='latest'
For streaming queries, seek_to defaults to 'latest'.
Start from Specific Stream Sequence Number
SELECT raw FROM ext_stream SETTINGS seek_to='1000'
This starts reading from sequence number 1000.
Start from Specific Timestamp
SELECT raw FROM ext_stream SETTINGS seek_to='2025-01-01T00:00:00.000'
Timeplus converts the timestamp to the appropriate starting point in the stream.
record_consume_timeout_ms
Use record_consume_timeout_ms to determine how long the external stream waits for new messages before returning results. Smaller values reduce latency but may impact performance.
SELECT raw FROM ext_stream SETTINGS record_consume_timeout_ms=100
record_consume_batch_count
Use record_consume_batch_count to control the number of messages fetched in each batch. Default is 1000.
SELECT raw FROM ext_stream SETTINGS record_consume_batch_count=500