Kafka 外部流
You can read data from Apache Kafka (as well as Confluent Cloud, or Redpanda) in Timeplus with External Stream. 结合 物化视图 和 目标流,你还可以使用外部流向 Apache Kafka 写入数据。
创建外部流
In Timeplus Proton, the external stream supports Kafka API as the only type.
In Timeplus Enterprise, it also supports External Stream for Apache Pulsar and External Stream for other Timeplus deployment.
To create an external stream for Apache Kafka or Kafka-compatiable messaging platforms, you can run the following DDL SQL:
CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name
(<col_name1> <col_type>)
SETTINGS
type='kafka',
brokers='ip:9092',
topic='..',
security_protocol='..',
username='..',
password='..',
sasl_mechanism='..',
data_format='..',
kafka_schema_registry_url='..',
kafka_schema_registry_credentials='..',
ssl_ca_cert_file='..',
ssl_ca_pem='..',
skip_ssl_cert_check=..
The supported values for security_protocol
are:
- 纯文本:省略此选项时,这是默认值。
- SASL_SSL:设置此值时,应指定用户名和密码。
- 如果你需要指定自己的 SSL 认证文件,可以添加另一个设置
ssl_ca_cert_file='/ssl/ca.pem'
Proton 1.5.5 中的新增内容,如果你不想或无法使用文件路径,例如在 Timeplus Cloud 或 Docker/Kuker/Kup 中,也可以将 pem 文件的全部内容作为字符串放入ssl_ca_pem
设置中伯内特斯环境。 - 可以通过
设置 skip_ssl_cert_check=true
来跳过 SSL 认证验证。
- 如果你需要指定自己的 SSL 认证文件,可以添加另一个设置
The supported values for sasl_mechanism
are:
- PLAIN:当你将 security_protocol 设置为 SASL_SSL 时,这是 sasl_mechanmic 的默认值。
- SCRAM-SHA-256
- SCRAM-SHA-512
The supported values for data_format
are:
- JSONEachRow: parse each row of the message as a single JSON document. The top level JSON key/value pairs will be parsed as the columns. 了解更多.
- CSV:不太常用。 了解更多.
- ProtobufSingle: for single Protobuf message per message
- Protobuf: there could be multiple Protobuf messages in a single message.
- Avro:在 Proton 1.5.2 中添加
- rawBlob:默认值。 Read/write message as plain text.
For examples to connect to various Kafka API compatitable message platforms, please check this doc.
定义列
Single column to read
如果 Kafka 主题中的消息是纯文本格式或 JSON,则可以创建只有 字符串
类型的 原始
列的外部流。
示例:
Then use query time JSON extraction functions or shortcut to access the values, e.g. raw:id
.
以纯文本写入 Kafka
您可以使用带有单列的外部流向 Kafka 主题写入纯文本消息。
然后使用 INSERT INTO <stream_name> VALUES (v)
或 Ingest REST API,或者将其设置为物化视图向卡夫卡主题写入消息的目标流。 实际的 data_format
值为 rawBlob
但可以省略。 By default one_message_per_row
is true
.
Since Timeplus Proton 1.5.11, a new setting kafka_max_message_size
is available. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message, ensuring it won't exceed the kafka_max_message_size
limit.
从 Kafka 中读取多列
If the keys in the JSON message never change, or you don't care about the new columns, you can also create the external stream with multiple columns.
您可以在 JSON 中选取一些顶级键作为列,或将所有可能的键选为列。
Please note the behaviors are changed in recent versions, based on user feedback:
版本 | 默认行为 | 如何覆盖 |
---|---|---|
1.4.2 或以上 | 假设在 JSON 中有 5 个顶级键/值对,则可以在外部流中定义 5 列或少于 5 列。 数据将被正确读取。 | 如果你不想读取带有意外列的新事件,请在 CREATE DDL 中设置 input_format_skip_unknown_fields=false 。 |
1.3.24 到 1.4.1 | 假设JSON中有5个顶级键/值对,则可能需要定义5列才能全部读取。 或者在 DDL 中定义少于 5 列,并确保在每个 SELECT 查询设置中添加 input_format_skip_unknown_fields=true ,否则不会返回任何搜索结果。 | or only define some keys as columns and append this to your query: SETTINGS input_format_skip_unknown_fields=true |
1.3.23 或更高版本 | 你必须为整个 JSON 文档定义一个 字符串 列,并将查询时 JSON 解析应用于提取字段。 | 不适用 |
示例:
如果消息中有嵌套的复杂 JSON,则可以将该列定义为字符串类型。 实际上,任何 JSON 值都可以保存在字符串列中。
Protobuf messages can be read with all or partial columns. Please check this page.
要写入 Kafka 的多列
To write data to Kafka topics, you can choose different data formats:
jsoneaChrow
You can use data_format='JSONEachRow',one_message_per_row=true
to inform Timeplus to write each event as a JSON document. 外部流的列将转换为 JSON 文档中的密钥。 例如:
消息将在特定主题中生成
Please note, by default multiple JSON documents will be inserted to the same Kafka message. 每行/每行一个 JSON 文档。 这种默认行为旨在为Kafka/Redpanda获得最大的写入性能。 但是你需要确保下游应用程序能够正确拆分每条 Kafka 消息的 JSON 文档。
如果你需要每条 Kafka 消息的有效的 JSON,而不是 JSONL,请设置 one_message_per_row=true
例如
The default value of one_message_per_row, if not specified, is false for data_format='JSONEachRow'
and true for data_format='RawBLOB'
.
Since Timeplus Proton 1.5.11, a new setting kafka_max_message_size
is available. When multiple rows can be written to the same Kafka message, this setting will control how many data will be put in a Kafka message and when to create new Kafka message, ensuring each message won't exceed the kafka_max_message_size
limit.
CSV
You can use data_format='CSV'
to inform Timeplus to write each event as a JSON document. 外部流的列将转换为 JSON 文档中的密钥。 例如:
消息将在特定主题中生成
“2023-10-29 05:35:54.176 “,” https://www.nationalwhiteboard.info/sticky/recontextualize/robust/incentivize","PUT","3eaf6372e909e033fcfc2d6a3bc04ace”