Skip to main content

Pulsar Source

Overview

Apache® Pulsar™ is a multi-tenant, high-performance solution for server-to-server messaging.

In Timeplus Enterprise v2.5, we added the first-class integration for Apache Pulsar, as a new type of External Stream. You can read or write data in Apache Pulsar or StreamNative Cloud. This is also available in Timeplus Proton, since v1.6.8.

Create Pulsar External Stream

To create an external stream for Apache Pulsar, you can run the following DDL SQL:

CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name
(<col_name1> <col_type>)
SETTINGS
type='pulsar', -- required
service_url='pulsar://host:port',-- required
topic='..', -- required
jwt='..',
config_file='..',
data_format='..',
format_schema='..',
one_message_per_row=..,
skip_server_cert_check=..,
validate_hostname=..,
ca_cert='..',
client_cert='..',
client_key='..',
connections_per_broker=..,
memory_limit=..,
io_threads=..

Connect to a local Apache Pulsar

If you have a local Apache Pulsar server running, you can run the following SQL DDL to create an external stream to connect to it.

CREATE EXTERNAL STREAM local_pulsar (raw string)
SETTINGS type='pulsar',
service_url='pulsar://localhost:6650',
topic='persistent://public/default/my-topic'

Connect to StreamNative Cloud

If you have the access to StreamNative Cloud, you can run the following SQL DDL to create an external stream to connect to it, with a proper API Key for a service account. Make sure you choose "Create API Key", instead of the "Get JWT Token (Depreciated)".

screenshot

CREATE EXTERNAL STREAM ext_stream (raw string)
SETTINGS type='pulsar',
service_url='pulsar+ssl://pc-12345678.gcp-shared-usce1.g.snio.cloud:6651',
topic='persistent://tenant/namespace/topic',
jwt='eyJh..syFQ'

DDL Settings

skip_server_cert_check

Default false. If set to true, it will accept untrusted TLS certificates from brokers.

validate_hostname

Default false. Configure whether it allows validating hostname verification when a client connects to a broker over TLS.

ca_cert

The CA certificate (PEM format), which will be used to verify the server's certificate.

client_cert

The certificate (PEM format) for the client to use mTLS authentication. Learn more.

client_key

The private key (PEM format) for the client to use mTLS authentication.

jwt

The JSON Web Tokens for the client to use JWT authentication. Learn more.

config_file

The config_file setting is available since Timeplus Enterprise 2.7. You can specify the path to a file that contains the configuration settings. The file should be in the format of key=value pairs, one pair per line. You can set the Pulsar credentials in the file.

Please follow the example in Kafka External Stream.

connections_per_broker

Default 1. Sets the max number of connection that this external stream will open to a single broker. By default, the connection pool will use a single connection for all the producers and consumers.

memory_limit

Default 0 (unlimited). Configure a limit on the amount of memory that will be allocated by this external stream.

io_threads

Default 1. Set the number of I/O threads to be used by the Pulsar client.

Like Kafka External Stream, Pulsar External Stream also supports all format related settings: data_format, format_schema, one_message_per_row, etc.

data_format

The supported values for data_format are:

  • JSONEachRow: parse each row of the message as a single JSON document. The top level JSON key/value pairs will be parsed as the columns.
  • CSV: less commonly used.
  • TSV: similar to CSV but tab as the separator
  • ProtobufSingle: for single Protobuf message per message
  • Protobuf: there could be multiple Protobuf messages in a single message.
  • Avro
  • RawBLOB: the default value. Read/write message as plain text.

For data formats which write multiple rows into one single message (such as JSONEachRow or CSV), two more advanced settings are available:

max_insert_block_size

max_insert_block_size to control the maximum number of rows can be written into one message.

max_insert_block_bytes

max_insert_block_bytes to control the maximum size (in bytes) that one message can be.

Read Data from Pulsar

Timeplus allows reading Pulsar messages in multiple data formats, including:

  • Plain string (raw)
  • CSV / TSV
  • JSON
  • Protobuf
  • Avro

Read messages in a single column

If the message in Pulsar topic is in plain text format or JSON, you can create an external stream with only a raw column in string type.

Example:

CREATE EXTERNAL STREAM ext_github_events (raw string)
SETTINGS type='pulsar', service_url='pulsar://host:port', topic='..'

Then use query time JSON extraction functions or shortcut to access the values, e.g. raw:id.

Read messages as multiple columns

If the keys in the JSON message never change, or you don't care about the new columns, you can also create the external stream with multiple columns.

You can pick up some top level keys in the JSON as columns, or all possible keys as columns.

Example:

CREATE EXTERNAL STREAM ext_stream_parsed
(address string, firstName string, middleName string, lastName string, email string, username string, password string,sex string,telephoneNumber string, dateOfBirth int64, age uint8, company string,companyEmail string,nationalIdentityCardNumber string,nationalIdentificationNumber string,
passportNumber string)
SETTINGS type='pulsar',
service_url='pulsar+ssl://pc-12345678.gcp-shared-usce1.g.snio.cloud:6651',
topic='persistent://docs/ns/datagen',
data_format='JSONEachRow',
jwt='eyJhb..syFQ'

If there are nested complex JSON in the message, you can define the column as a string type. Actually any JSON value can be saved in a string column.

Virtual Columns

Pulsar external stream has the follow virtual columns:

_tp_time

the event time of the Pulsar message if it's available, or it's the publish time otherwise.

_tp_append_time

the publish time of the pulsar message.

_tp_process_time

the timestamp when the message was read by Pulsar External Stream.

_tp_shard

the partition ID, starting from 0.

_pulsar_message_id

an array which contains 4 elements: ledger_id, entry_id, partition, and batch_index.

_tp_sn

the sequence number in Timeplus, in int64 type.

_tp_message_key

the message key (a.k.a partition key). Can be empty.

_tp_message_headers

Starting from Timeplus Enterprise 2.8.2, you can read and write custom headers via this column.

Define the column in the DDL:

CREATE EXTERNAL STREAM example (
s string,
i int,
...,
_tp_message_headers map(string, string)
) settings type='pulsar',...;

Then insert data to the external stream via INSERT INTO or materialized views, with a map of string pairs as custom headers for each message.

Query Settings

shards

You can read in specified Pulsar partitions. By default, all partitions will be read. But you can also read from a single partition via the shards setting, e.g.

SELECT raw FROM ext_stream SETTINGS shards='0'

Or you can specify a set of partition ID, separated by comma, e.g.

SELECT raw FROM ext_stream SETTINGS shards='0,2'

record_consume_timeout_ms

Use setting record_consume_timeout_ms to determine how much time the external can wait for new messages before returning the query result. The smaller the value is, the smaller the latency will be, but also will be less performant.

Read existing messages

When you run SELECT raw FROM ext_stream , Timeplus will read the new messages in the topics, not the existing ones.

seek_to

If you need to read all existing messages, you can use the following settings:

SELECT raw FROM ext_stream SETTINGS seek_to='earliest'

Or the following SQL:

SELECT raw FROM table(ext_stream) WHERE ...

Note: both earliest and latest are supported. You can also use seek_to='2024-10-14' for date or datetime based rewind. But number-based seek_to is not supported.

warning

Please avoid scanning all existing data via select * from table(ext_stream).

Read / Write Pulsar Message Key

For each message in the topic, the value is critical for sure. The key is optional but could carry important meta data.

You can define the _tp_message_key column when you create the external stream.

For example:

CREATE EXTERNAL STREAM test_msg_key (
id int32,
name string,
_tp_message_key string
) SETTINGS type='pulsar',
service_url='pulsar://host.docker.internal:6650',
topic='persistent://public/default/msg-key'

You can insert any data to the Pulsar topic.

When insert a row to the stream like:

INSERT INTO test_msg_key(id,name,_tp_message_key) VALUES (1, 'John', 'some-key');

'some-key' will be used for the message key for the Pulsar message (and it will be excluded from the message body, so the message will be {"id": 1, "name": "John"} for the above SQL).

When doing a SELECT query, the message key will be populated to the _tp_message_key column as well. SELECT * FROM test_msg_key will return 'some-key' for the _tp_message_key message.

_tp_message_key support the following types: uint8, uint16, uint32, uint64, int8, int16, int32, int64, bool, float32, float64, string, and fixed_string.

_tp_message_key also support nullable. Thus we can create an external stream with optional message key. For example:

CREATE EXTERNAL STREAM foo (
id int32,
name string,
_tp_message_key nullable(string) default null
) SETTINGS type='pulsar',...;