Protobuf/Avro 架构
Timeplus supports reading or writing messages in Protobuf or Avro format for Kafka External Stream or Pulsar External Stream. 本文档介绍如何在没有架构注册表的情况下处理数据。 Check this page if your Kafka topics are associated with a Schema Registry.
创建架构
如果没有架构注册表,则需要使用 SQL 定义 Protobuf 或 Avro 架构。
Protobuf
创建或替换格式架构 schema_name 为 '
语法 = “proto3”;
消息 searchRequest {
字符串查询 = 1;
int32 page_number = 2;
int32 results_per_page = 3;
}
'类型 Protobuf
Then refer to this schema while creating an external stream for Kafka or Pulsar:
创建外部流 stream_name (
查询字符串,
page_number int32,
results_per_page int32)
设置类型='kafka',
brokers='pkc-1234.us-west-2.aws.confluent.cloud: 9092',
topic='topic_name',
security_protocol='sasl_SSL',
username='..',
密码='..',
data_format='protobufSingle',
format_schema='schema_name: searchrequest'
Then you can run INSERT INTO
or use a materialized view to write data to the topic.
INSERT INTO stream_name(query,page_number,results_per_page) VALUES('test',1,100)
请注意:
- 如果你想确保每条 Kafka 消息只有一条 Protobuf 消息,请将 data_format 设置为 protobufSingle。 如果你将其设置为 Protobuf,那么在一条 Kafka 消息中可能会有多条 Protobuf 消息。
format_schema
设置包含两部分:注册的架构名称(在本示例中:架构名称)和消息类型(在本示例中:SearchRequest)。 用分号将它们组合在一起。- 你可以使用这个外部流在目标 Kafka/Confluent 主题中读取或写入 Protobuf 消息。
- For more advanced use cases, please check the examples for complex schema.
Avro
Available since Timeplus Proton 1.5.10.
CREATE OR REPLACE FORMAT SCHEMA avro_schema AS '{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
' TYPE Avro;
Then refer to this schema while creating an external stream for Kafka or Pulsar:
CREATE EXTERNAL STREAM stream_avro(
name string,
favorite_number nullable(int32),
favorite_color nullable(string))
SETTINGS type='kafka',
brokers='pkc-1234.us-west-2.aws.confluent.cloud:9092',
topic='topic_name',
security_protocol='SASL_SSL',
username='..',
password='..',
data_format='Avro',
format_schema='avro_schema'
Then you can run INSERT INTO
or use a materialized view to write data to the topic.
INSERT INTO stream_avro(name,favorite_number,favorite_color) VALUES('test',1,'red')
列出架构
List schemas in the current Timeplus deployment:
显示格式架构
显示架构的详细信息
显示创建格式架构 schema_name
删除架构
删除格式架构 <IF EXISTS> schema_name;
复杂 Protobuf 架构的示例
嵌套架构
创建格式架构 simple_nested AS '
语法 = “proto3";
消息名称 {
字符串第一 = 1;
字符串最后 = 2;
}
消息 Person {
字符串电子邮件 = 1;
姓名 = 2;
int32 年龄 = 3;
map<string, int32> skills = 4;
}
'TYPE Protobuf
创建外部流人物(
电子邮件字符串、
name_first 字符串、
name.last 字符串、
技能地图(字符串,int32)、
年龄 int32
)
设置 type='kafka'... data_format='protobufSingle',
format_schema='simple_nested: person'
请注意:
Person
是顶级消息类型。 它指的是 “名称” 消息类型。- 使用
name
作为前缀作为列名。 使用\ _ 或。 将前缀与嵌套字段名称连接起来。 - 当你创建外部流来读取 Protobuf 消息时,你不必定义所有可能的列。 只有您定义的列才会被读取。 跳过其他列/字段。
枚举
Say in your Protobuf definition, there is an enum type:
枚举级别 {
levelOne = 0;
levelTwo = 1;
}
你可以在 Proton 中使用枚举类型,例如
创建外部流... (
..
level enum8 ('levelOne'=0, 'levelTwo'=1),
..
)
重复
假设在你的 Protobuf 定义中,有一种重复的类型:
重复的字符串状态
你可以在 Proton 中使用数组类型,例如
创建外部流... (
..
状态数组(字符串),
..
)
重复和嵌套
比如说,在你的 Protobuf 定义中,有一个自定义类型的字段,而且还重复了这个字段:
syntax = “proto3";
message DataComponent {
可选字符串消息 = 1;
消息参数 {
message keyValue {
可选字符串名称 = 1;
可选字符串值 = 2;
}
重复的 keyValue 参数 = 1;
}
可选参数参数 = 2;
}
你可以在 Proton 中使用元组类型,例如
创建外部流... (
消息字符串,
参数元组(参数嵌套(名称字符串,值字符串))
)
流数据将显示为:
从 stream_name 中选择 *;
消息 | 参数 |
---|---|
第 0 号 | ([('key_1', 'value_1'), ('key_2', 'value_2'), ('key_3', 'value_3')]) |
包裹
假设在你的 Protobuf 定义中,有一个软件包:
软件包演示;
消息 stockRecord {
..
}
如果 Protobuf 定义类型中只有 1 个软件包,则不必包含软件包名称。 例如:
创建外部流... (
..
)
设置... format_schema= “schema_name: stockRecord”
如果有多个软件包,则可以在软件包中使用完全限定名称,例如
创建外部流... (
..
)
设置... format_schema= “schema_name: demo.stockRecord”
导入架构
如果你使用 创建格式架构 来注册格式架构,比如 schema_name
,你可以创建另一个架构并导入这个架构:
创建格式架构 import_example 为 '
import “schema_name.proto”;
消息 Test {
必填字符串 ID = 1;
可选级别 theLevel = 2;
}
'TYPE Protobuf
请务必添加 .proto
作为后缀。
Avro 数据类型映射
Avro Primitive Types
The table below shows supported Avro primitive data types and how they match Timeplus data types in INSERT and SELECT queries.
Avro 数据类型 | Timeplus 数据类型 |
---|---|
整数 | int8, int16, int32, uint8, uint16, uint32 |
长 | int64,uint64 |
浮点数 | float32 |
双重的 | float64 |
字节,字符串 | 字符串 |
固定 (N) | fixed_string (N) |
枚举 | enum8,enum16 |
数组 (T) | 数组 (T) |
地图 (k, v) | 地图 (k, v) |
联盟(空值,T) | 可为空 (T) |
空的 | 可为空(什么都没有) |
int(日期) | 日期,日期32 |
长(时间戳毫秒) | datetime64 (3) |
长(时间戳微秒) | datetime64 (6) |
字符串 (uuid) | uuid |
记录 | 元组 |