Encode/Decode Avro Messages
示例:在 Confluent Cloud 中读取 Avro 编码的数据
假设你创建了这样的 Avro 架构定义:
{
“类型”:“记录”,
“命名空间”:“com.mycorp.mynamespace”,
“名称”:“SampleRecord”,
“doc”:“可以帮助你入门的示例架构。“,
“字段”:[
{
“名称”:“my_field1",
“类型”:“int”,
“doc”:“int 类型是 32 位有符号整数。”
},
{
“名称”:“my_field2”,
“类型”:“双精度”,
“doc”:“双精度类型是双精度(64 位)IEEE 754 浮点数。”
},
{
“名称”:“my_field3”,
“类型”:“字符串”,
“doc”:“字符串是一个 Unicode 字符序列。”
},
{
“名称”:“my_field4",
“类型”:{
“类型”:“长”,
“LogicalType”:“timestamp-millis”
},
“doc”:“用例”
}
]
}
在 Confluent Cloud 中创建主题,您可以使用以下命令以 Avro 格式将数据推送到该主题:
confluent kafka 话题生成 $TOPIC --schema ~/Dev/schema.txt\
--schema-registry-endpoint https://psrc-ab123.us-east-2.aws.confluent.cloud\
--schema-registry-api-key $API_KEY \
--schema-registry-api-secret $API_SECRET \
--value-for
例如,您可以逐行添加消息
{“my_field1": 1、“my_field2”: 3.4、“my_field3”: “你好”、“my_field4”: 1707954127790}
现在让我们在 Proton 中创建一个外部流来读取这样的消息:
CREATE EXTERNAL STREAM avro_stream(
my_field1 int8,
my_field2 float32,
my_field3 string,
my_field4 int64
)
SETTINGS
type = 'kafka',
brokers = 'pkc-ab123.us-east-2.aws.confluent.cloud:9092',
security_protocol='SASL_SSL',
username='$KEY',
password='$SECRET',
topic = '$TOPIC',
data_format = 'Avro',
kafka_schema_registry_url = 'https://psrc-ab123.us-east-2.aws.confluent.cloud',
kafka_schema_registry_credentials = '$API_KEY:$API_SECRET';
成功运行此 SQL 后,您可以通过以下方式获取现有数据
从 avro_stream 中选择 * 其中 _tp_time>earliest_ts ()
或者只通过以下方式获取收到的新消息
从 avro_stream 中选择 *
示例:在 Confluent 平台中读取 Avro 编码的数据
你可以关注 Confluent Docs 通过 Docker Compose 启动带有架构注册表的 Confluent 平台。
Avro 架构定义:
{
“命名空间”:“io.confluent.examples.clients.basicavro”,
“类型”:“记录”,
“名称”:“付款”,
“字段”:[
{“名称”:“id”,“类型”:“字符串”},
{“名称”:“金额”,“类型”:“双倍”}
]