Pulsar External Stream
Apache® Pulsar™ is a multi-tenant, high-performance solution for server-to-server messaging.
In Timeplus Enterprise v2.5 (unreleased yet), we added the first-class integration for Apache Pulsar, as a new type of External Stream. You can read or write data in Apache Pulsar or StreamNative Cloud.
创建外部流
To create an external stream for Apache Pulsar, you can run the following DDL SQL:
CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name
(<col_name1> <col_type>)
SETTINGS
type='pulsar', -- required
service_url='pulsar://host:port',-- required
topic='..', -- required
jwt='..',
data_format='..',
format_schema='..',
one_message_per_row=..,
skip_server_cert_check=..,
validate_hostname=..,
ca_cert='..',
client_cert='..',
client_key='..',
connections_per_broker=..,
memory_limit=..,
io_threads=..
Connect to a local Apache Pulsar
If you have a local Apache Pulsar server running, you can run the following SQL DDL to create an external stream to connect to it.
CREATE EXTERNAL STREAM local_pulsar (raw string)
SETTINGS type='pulsar',
service_url='pulsar://localhost:6650',
topic='persistent://public/default/my-topic'
Connect to StreamNative Cloud
If you have the access to StreamNative Cloud, you can run the following SQL DDL to create an external stream to connect to it, with a proper JWT API Key for a service account.
CREATE EXTERNAL STREAM ext_stream (raw string)
SETTINGS type='pulsar',
service_url='pulsar+ssl://pc-12345678.gcp-shared-usce1.g.snio.cloud:6651',
topic='persistent://tenant/namespace/topic',
jwt='eyJh..syFQ'
DDL Settings
skip_server_cert_check
Default false. If set to true, it will accept untrusted TLS certificates from brokers.
validate_hostname
Default false. Configure whether it allows validating hostname verification when a client connects to a broker over TLS.
ca_cert
The CA certificate (PEM format), which will be used to verify the server's certificate.
client_cert
The certificate (PEM format) for the client to use mTLS authentication. 了解更多.
client_key
The private key (PEM format) for the client to use mTLS authentication.
jwt
The JSON Web Tokens for the client to use JWT authentication. 了解更多.
connections_per_broker
Default 1. Sets the max number of connection that this external stream will open to a single broker. By default, the connection pool will use a single connection for all the producers and consumers.
memory_limit
Default 0 (unlimited). Configure a limit on the amount of memory that will be allocated by this external stream.
io_threads
Default 1. Set the number of I/O threads to be used by the Pulsar client.
Like Kafka External Stream, Pulsar External Stream also supports all format related settings: data_format
, format_schema
, one_message_per_row
, etc.
data_format
The supported values for data_format
are:
- JSONEachRow: parse each row of the message as a single JSON document. The top level JSON key/value pairs will be parsed as the columns. Learn More.
- CSV:不太常用。 Learn More.
- TSV: similar to CSV but tab as the separator
- ProtobufSingle: for single Protobuf message per message
- Protobuf: there could be multiple Protobuf messages in a single message.
- Avro
- rawBlob:默认值。 Read/write message as plain text.
For data formats which write multiple rows into one single message (such as JSONEachRow
or CSV
), two more advanced settings are available:
max_insert_block_size
max_insert_block_size
to control the maximum number of rows can be written into one message.
max_insert_block_bytes
max_insert_block_bytes
to control the maximum size (in bytes) that one message can be.
Read Data in Pulsar
Read messages in a single column
If the message in Pulsar topic is in plain text format or JSON, you can create an external stream with only a raw
column in string
type.
示例:
CREATE EXTERNAL STREAM ext_github_events (raw string)
SETTINGS type='pulsar', service_url='pulsar://host:port', topic='..'
Then use query time JSON extraction functions or shortcut to access the values, e.g. raw:id
.
Read messages as multiple columns
If the keys in the JSON message never change, or you don't care about the new columns, you can also create the external stream with multiple columns.
您可以在 JSON 中选取一些顶级键作为列,或将所有可能的键选为列。
示例:
CREATE EXTERNAL STREAM ext_stream_parsed
(address string, firstName string, middleName string, lastName string, email string, username string, password string,sex string,telephoneNumber string, dateOfBirth int64, age uint8, company string,companyEmail string,nationalIdentityCardNumber string,nationalIdentificationNumber string,
passportNumber string)
SETTINGS type='pulsar',
service_url='pulsar+ssl://pc-12345678.gcp-shared-usce1.g.snio.cloud:6651',
topic='persistent://docs/ns/datagen',
data_format='JSONEachRow',
jwt='eyJhb..syFQ'
如果消息中有嵌套的复杂 JSON,则可以将该列定义为字符串类型。 实际上,任何 JSON 值都可以保存在字符串列中。
Virtual Columns
Pulsar external stream has the follow virtual columns:
_tp_time
the event time of the Pulsar message if it's available, or it's the publish time otherwise.
_tp_append_time
the publish time of the pulsar message.
_tp_process_time
the timestamp when the message was read by Pulsar External Stream.
_tp_shard
the partition ID, starting from 0.
_pulsar_message_id
an array
which contains 4 elements: ledger_id, entry_id, partition, and batch_index.
_tp_sn
the sequence number in Timeplus, in int64 type.
_tp_message_key
the message key (a.k.a partition key). Can be empty.
查询设置
shards
You can read in specified Pulsar partitions. 默认情况下,将读取所有分区。 But you can also read from a single partition via the shards
setting, e.g.
SELECT raw FROM ext_stream SETTINGS shards='0'