Skip to main content

NATS JetStream Source

Overview

NATS is a high-performance, lightweight messaging system. NATS JetStream is the built-in streaming layer for NATS that provides durable, replayable message streams with advanced features like message acknowledgment, persistence, and consumer management.

Timeplus provides first-class integration for NATS JetStream as a new type of External Stream. You can read or write data in NATS JetStream using SQL queries, similar to how you work with Kafka or Pulsar external streams.

Create NATS JetStream External Stream

Use the following SQL command to create a NATS JetStream external stream:

CREATE EXTERNAL STREAM [IF NOT EXISTS] <stream_name>
(<col_name1> <col_type>)
SETTINGS
type='nats_jetstream', -- required
url='nats://host:port', -- required
stream_name='..', -- required
subject='..', -- required
consumer_stall_timeout_ms=..,
username='..',
password='..',
token='..',
secure=<true|false>,
ssl_ca_cert_file='..',
skip_ssl_cert_check=<true|false>,
ssl_cert_file='..',
ssl_key_file='..',
data_format='..',
format_schema='..',
one_message_per_row=..,
config_file='..',
named_collection='..';

Settings

type

Must be set to nats_jetstream.

url

The NATS server URL. Example: nats://localhost:4222

stream_name

The name of the JetStream stream to connect to. The stream must exist on the NATS server before creating the external stream. Timeplus validates the stream exists during creation.

subject

The NATS subject to subscribe or publish messages. Wild cards * and > are supported.

For inserts, messages are published to this subject unless overwritten by the _nats_subject column.

consumer_stall_timeout_ms

Stall detection timeout in milliseconds. If no progress is made for this duration, Timeplus will recreate the subscription to recover from potential stalls.

Default: 60000

Authentication Settings

Timeplus supports multiple authentication mechanisms for NATS. Only one method can be used at a time.

username / password

Username and password authentication.

token

Token-based authentication.

TLS Settings

secure

Set 'true' to use a secure (SSL/TLS) connection.

ssl_ca_cert_file

Path to the CA certificate file for TLS verification.

skip_ssl_cert_check

Set 'true' to skip server certificate verification.

warning

This is fine for tests but use with caution since this is not secure.

ssl_cert_file / ssl_key_file

For mTLS (mutual TLS), provide both the client certificate and private key files path. Both must be specified together.

The certificates must be in PEM format and must be sorted starting with the subject's certificate, followed by intermediate CA certificates if applicable, and ending at the highest level (root) CA.

The private key file format supported is also PEM.

Data Format Settings

data_format

Defines how NATS messages are parsed and written.

Common formats include:

FormatDescription
RawBLOBRaw text, no parsing
JSONEachRowOne JSON document per line
CSVComma-separated values
TSVTab-separated values
ProtobufSingleOne Protobuf message per NATS message
ProtobufMultiple Protobuf messages per NATS message
AvroAvro-encoded messages

format_schema

Required for ProtobufSingle, Protobuf, and Avro formats. Defines the schema for message serialization.

one_message_per_row

Set to true to ensure each NATS message maps to exactly one JSON document, especially when writing with JSONEachRow.

info

When _tp_message_headers column is defined, one_message_per_row must be true and will be automatically set.

Other Settings

config_file

Path to a configuration file containing key-value pairs. Useful for managing credentials securely, especially in Kubernetes environments with secrets managed via HashiCorp Vault.

Example config file:

username=my_username
password=my_password
data_format=JSONEachRow
one_message_per_row=true

named_collection

Named Collections allow you to group shared configuration settings into a reusable object. This simplifies DDL and enhances security by masking sensitive information.

Example:

CREATE NAMED COLLECTION nats_nc AS
url='nats://localhost:4222',
username='admin_user',
password='admin';

CREATE EXTERNAL STREAM test_nats_es(raw string)
SETTINGS
type='nats_jetstream',
stream_name='my_stream',
subject='my.subject',
named_collection='nats_nc';

For more details, refer to Named Collection documentation.

Read Data from NATS JetStream

Timeplus allows reading NATS JetStream messages in multiple data formats, including:

  • Plain string (raw)
  • CSV / TSV
  • JSON
  • Protobuf
  • Avro

Read NATS Messages as Raw String

Use this mode when:

  • Messages contain unstructured text or binary data
  • No built-in format is applicable
  • You want to debug raw NATS messages

Raw String Example

CREATE EXTERNAL STREAM ext_application_logs (raw string)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='application_logs',
subject='app.logs.*'

You can use functions like regex string processing or JSON extract functions to further process the raw string.

Regex Example – Parse Application Logs

SELECT
to_time(extract(raw, '^(\\d{4}\\.\\d{2}\\.\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d+)')) AS timestamp,
extract(raw, '} <(\\w+)>') AS level,
extract(raw, '} <\\w+> (.*)') AS message
FROM ext_application_logs;

Read JSON NATS Messages

Assuming NATS messages contain JSON text with this schema:

{
"actor": string,
"created_at": timestamp,
"id": string,
"payload": string,
"repo": string,
"type": string
}

You can process JSON in two ways:

Option A: Parse with JSON Extract Functions

  1. Create a raw stream:
CREATE EXTERNAL STREAM ext_json_raw (raw string)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='github_events',
subject='github.events.>';
  1. Extract fields using JSON extract shortcut syntax or JSON extract functions:
SELECT
raw:actor AS actor,
raw:created_at::datetime64(3, 'UTC') AS created_at,
raw:id AS id,
raw:payload AS payload,
raw:repo AS repo,
raw:type AS type
FROM ext_json_raw;

This method is most flexible and works well for dynamic JSON with new or missing fields. It can also extract nested JSON fields.

Option B: Use JSONEachRow Format

Define a NATS JetStream external stream with columns mapped to the JSON fields and specify data_format='JSONEachRow':

CREATE EXTERNAL STREAM ext_json_parsed
(
actor string,
created_at datetime64(3, 'UTC'),
id string,
payload string,
repo string,
type string
)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='github_events',
subject='github.events',
data_format='JSONEachRow'

When you query the stream, JSON fields are parsed and cast to the target column types automatically.

This method is most convenient when the JSON schema is stable and works for top-level JSON fields.

Read CSV NATS Messages

Similar to JSONEachRow, you can read CSV formatted messages:

CREATE EXTERNAL STREAM ext_csv_parsed
(
actor string,
created_at datetime64(3, 'UTC'),
id string,
payload string,
repo string,
type string
)
SETTINGS type='nats_jetstream',
url='nats://localhost:4222',
stream_name='csv_stream',
subject='csv.data',
data_format='CSV';

Read TSV NATS Messages

Identical to CSV, but expects tab-separated values:

SETTINGS data_format='TSV';

Read Avro or Protobuf Messages

To read Avro-encoded or Protobuf-encoded NATS messages, please refer to Schema documentation.

Access NATS Message Metadata

Timeplus provides virtual columns for NATS JetStream message metadata.

Virtual ColumnDescriptionType
_tp_timeNATS message timestampdatetime64(3, 'UTC')
_tp_append_timeMessage append timedatetime64(3, 'UTC')
_tp_process_timeProcessing timedatetime64(3, 'UTC')
_tp_snStream sequence numberint64
_tp_shardAlways 0 for NATSint32
_tp_message_headersNATS headers as key-value mapmap(string, string)
_nats_subjectNATS subjectstring

NATS Message Metadata Examples

-- View message time and payload
SELECT _tp_time, raw FROM ext_github_events;

-- View message subject
SELECT _nats_subject, raw FROM ext_github_events;

-- Access headers
SELECT _tp_message_headers['trace_id'], raw FROM ext_github_events;

-- View sequence number
SELECT _tp_sn, raw FROM ext_github_events;

Query Settings for NATS JetStream External Streams

Controlling Where to Start Reading

Use the seek_to query setting to control where to start consuming messages.

Start from Earliest (All Messages)
SELECT raw FROM ext_stream SETTINGS seek_to='earliest'

For non-streaming queries (using table() function), seek_to defaults to 'earliest'.

Start from Latest (New Messages Only)
SELECT raw FROM ext_stream SETTINGS seek_to='latest'

For streaming queries, seek_to defaults to 'latest'.

Start from Specific Stream Sequence Number
SELECT raw FROM ext_stream SETTINGS seek_to='1000'

This starts reading from sequence number 1000.

Start from Specific Timestamp
SELECT raw FROM ext_stream SETTINGS seek_to='2025-01-01T00:00:00.000'

Timeplus converts the timestamp to the appropriate starting point in the stream.

record_consume_timeout_ms

Use record_consume_timeout_ms to determine how long the external stream waits for new messages before returning results. Smaller values reduce latency but may impact performance.

SELECT raw FROM ext_stream SETTINGS record_consume_timeout_ms=100

record_consume_batch_count

Use record_consume_batch_count to control the number of messages fetched in each batch. Default is 1000.

SELECT raw FROM ext_stream SETTINGS record_consume_batch_count=500