Skip to main content

Pulsar Sink

Overview

Apache® Pulsar™ is a multi-tenant, high-performance solution for server-to-server messaging.

In Timeplus Enterprise v2.5, we added the first-class integration for Apache Pulsar, as a new type of External Stream. You can read or write data in Apache Pulsar or StreamNative Cloud. This is also available in Timeplus Proton, since v1.6.8.

Create Pulsar External Stream

To create an external stream for Apache Pulsar, you can run the following DDL SQL:

CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name
(<col_name1> <col_type>)
SETTINGS
type='pulsar', -- required
service_url='pulsar://host:port',-- required
topic='..', -- required
jwt='..',
config_file='..',
data_format='..',
format_schema='..',
one_message_per_row=..,
skip_server_cert_check=..,
validate_hostname=..,
ca_cert='..',
client_cert='..',
client_key='..',
connections_per_broker=..,
memory_limit=..,
io_threads=..

Connect to a local Apache Pulsar

If you have a local Apache Pulsar server running, you can run the following SQL DDL to create an external stream to connect to it.

CREATE EXTERNAL STREAM local_pulsar (raw string)
SETTINGS type='pulsar',
service_url='pulsar://localhost:6650',
topic='persistent://public/default/my-topic'

Connect to StreamNative Cloud

If you have the access to StreamNative Cloud, you can run the following SQL DDL to create an external stream to connect to it, with a proper API Key for a service account. Make sure you choose "Create API Key", instead of the "Get JWT Token (Depreciated)".

screenshot

CREATE EXTERNAL STREAM ext_stream (raw string)
SETTINGS type='pulsar',
service_url='pulsar+ssl://pc-12345678.gcp-shared-usce1.g.snio.cloud:6651',
topic='persistent://tenant/namespace/topic',
jwt='eyJh..syFQ'

DDL Settings

skip_server_cert_check

Default false. If set to true, it will accept untrusted TLS certificates from brokers.

validate_hostname

Default false. Configure whether it allows validating hostname verification when a client connects to a broker over TLS.

ca_cert

The CA certificate (PEM format), which will be used to verify the server's certificate.

client_cert

The certificate (PEM format) for the client to use mTLS authentication. Learn more.

client_key

The private key (PEM format) for the client to use mTLS authentication.

jwt

The JSON Web Tokens for the client to use JWT authentication. Learn more.

config_file

The config_file setting is available since Timeplus Enterprise 2.7. You can specify the path to a file that contains the configuration settings. The file should be in the format of key=value pairs, one pair per line. You can set the Pulsar credentials in the file.

Please follow the example in Kafka External Stream.

connections_per_broker

Default 1. Sets the max number of connection that this external stream will open to a single broker. By default, the connection pool will use a single connection for all the producers and consumers.

memory_limit

Default 0 (unlimited). Configure a limit on the amount of memory that will be allocated by this external stream.

io_threads

Default 1. Set the number of I/O threads to be used by the Pulsar client.

Like Kafka External Stream, Pulsar External Stream also supports all format related settings: data_format, format_schema, one_message_per_row, etc.

data_format

The supported values for data_format are:

  • JSONEachRow: parse each row of the message as a single JSON document. The top level JSON key/value pairs will be parsed as the columns.
  • CSV: less commonly used.
  • TSV: similar to CSV but tab as the separator
  • ProtobufSingle: for single Protobuf message per message
  • Protobuf: there could be multiple Protobuf messages in a single message.
  • Avro
  • RawBLOB: the default value. Read/write message as plain text.

For data formats which write multiple rows into one single message (such as JSONEachRow or CSV), two more advanced settings are available:

max_insert_block_size

max_insert_block_size to control the maximum number of rows can be written into one message.

max_insert_block_bytes

max_insert_block_bytes to control the maximum size (in bytes) that one message can be.

Write Data to Pulsar

Write to Pulsar in Plain Text

You can write plain text messages to Pulsar topics with an external stream with a single column.

CREATE EXTERNAL STREAM ext_github_events (raw string)
SETTINGS type='pulsar', service_url='pulsar://host:port', topic='..'

Then use either INSERT INTO <stream_name> VALUES (v), or Ingest REST API, or set it as the target stream for a materialized view to write message to the Pulsar topic. The actual data_format value is RawBLOB but this can be omitted. By default one_message_per_row is true.

Advanced Settings for writing data

Settings for controlling the producer behavior:

  • output_batch_max_messages - Set the max number of messages permitted in a batch. If you set this option to a value greater than 1, messages are queued until this threshold is reached or batch interval has elapsed.
  • output_batch_max_size_bytes - Set the max size of messages permitted in a batch. If you set this option to a value greater than 1, messages are queued until this threshold is reached or batch interval has elapsed.
  • output_batch_max_delay_ms - Set the max time for message publish delay permitted in a batch.
  • pulsar_max_pending_messages - Set the max size of the producer's queue holding the messages pending to receive an acknowledgment from the broker. When the queue is full, the producer will be blocked.

Multiple columns to write to Pulsar

To write structured data to Pulsar topics, you can choose different data formats:

RawBLOB

Write the content as pain text.

JSONEachRow

You can use data_format='JSONEachRow',one_message_per_row=true to inform Timeplus to write each event as a JSON document. The columns of the external stream will be converted to keys in the JSON documents. For example:

CREATE EXTERNAL STREAM target(
_tp_time datetime64(3),
url string,
method string,
ip string)
SETTINGS type='pulsar',
service_url='pulsar://host:port',
topic='..',
data_format='JSONEachRow',
one_message_per_row=true;

The messages will be generated in the specific topic as

{
"_tp_time":"2023-10-29 05:36:21.957"
"url":"https://www.nationalweb-enabled.io/methodologies/killer/web-readiness"
"method":"POST"
"ip":"c4ecf59a9ec27b50af9cc3bb8289e16c"
}
info

Please note, by default multiple JSON documents will be inserted to the same Pulsar message. One JSON document each row/line. Such default behavior aims to get the maximum writing performance to Pulsar. But you need to make sure the downstream applications are able to properly split the JSON documents per message.

If you need a valid JSON per each message, instead of a JSONL, please set one_message_per_row=true e.g.

CREATE EXTERNAL STREAM target(_tp_time datetime64(3), url string, ip string)
SETTINGS type='pulsar', service_url='pulsar://host:port', topic='..',
data_format='JSONEachRow',one_message_per_row=true

The default value of one_message_per_row, if not specified, is false for data_format='JSONEachRow' and true for data_format='RawBLOB'.

CSV

You can use data_format='CSV' to inform Timeplus to write each event as a JSON document. The columns of the external stream will be converted to keys in the JSON documents. For example:

CREATE EXTERNAL STREAM target(
_tp_time datetime64(3),
url string,
method string,
ip string)
SETTINGS type='pulsar',
service_url='pulsar://host:port',
topic='..',
data_format='CSV';

The messages will be generated in the specific topic as

"2023-10-29 05:35:54.176","https://www.nationalwhiteboard.info/sticky/recontextualize/robust/incentivize","PUT","3eaf6372e909e033fcfc2d6a3bc04ace"
TSV

Similar to CSV but tab as the separator.

ProtobufSingle

You can write Protobuf-encoded messages in Pulsar topics.

First, you need to create a schema with SQL, e.g.

CREATE OR REPLACE FORMAT SCHEMA schema_name AS '
syntax = "proto3";

message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 results_per_page = 3;
}
' TYPE Protobuf

Then refer to this schema while creating an external stream for Pulsar:

CREATE EXTERNAL STREAM stream_name(
query string,
page_number int32,
results_per_page int32)
SETTINGS type='pulsar',
service_url='pulsar://host.docker.internal:6650',
topic='persistent://public/default/protobuf',
data_format='ProtobufSingle',
format_schema='schema_name:SearchRequest'

Then you can run INSERT INTO or use a materialized view to write data to the topic.

INSERT INTO stream_name(query,page_number,results_per_page) VALUES('test',1,100)

Please refer to Protobuf/Avro Schema for more details.

Avro

You can write messages in Avro format.

First, you need to create a schema with SQL, e.g.

CREATE OR REPLACE FORMAT SCHEMA avro_schema AS '{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
' TYPE Avro;

Then refer to this schema while creating an external stream for Pulsar:

CREATE EXTERNAL STREAM stream_avro(
name string,
favorite_number nullable(int32),
favorite_color nullable(string))
SETTINGS type='pulsar',
service_url='pulsar://host.docker.internal:6650',
topic='persistent://public/default/avro',
data_format='Avro',
format_schema='avro_schema'

Then you can run INSERT INTO or use a materialized view to write data to the topic.

INSERT INTO stream_avro(name,favorite_number,favorite_color) VALUES('test',1,'red')

Please refer to Protobuf/Avro Schema for more details.

Continuously Write to Pulsar via MV

You can use materialized views to write data to Pulsar as an external stream, e.g.

-- read the topic via an external stream
CREATE EXTERNAL STREAM frontend_events(raw string)
SETTINGS type='pulsar',
service_url='pulsar://host:port',
topic='owlshop-frontend-events';

-- create the other external stream to write data to the other topic
CREATE EXTERNAL STREAM target(
_tp_time datetime64(3),
url string,
method string,
ip string)
SETTINGS type='pulsar',
service_url='pulsar://host:port',
topic='..',
data_format='JSONEachRow',
one_message_per_row=true;

-- setup the ETL pipeline via a materialized view
CREATE MATERIALIZED VIEW mv INTO target AS
SELECT now64() AS _tp_time,
raw:requestedUrl AS url,
raw:method AS method,
lower(hex(md5(raw:ipAddress))) AS ip
FROM frontend_events;