Encode/Decode Avro Messages
示例:在 Confluent Cloud 中读取 Avro 编码的数据
假设你创建了这样的 Avro 架构定义:
{
“类型”:“记录”,
“命名空间”:“com.mycorp.mynamespace”,
“名称”:“SampleRecord”,
“doc”:“可以帮助你入门的示例架构。“,
“字段”:[
{
“名称”:“my_field1",
“类型”:“int”,
“doc”:“int 类型是 32 位有符号整数。”
},
{
“名称”:“my_field2”,
“类型”:“双精度”,
“doc”:“双精度类型是双精度(64 位)IEEE 754 浮点数。”
},
{
“名称”:“my_field3”,
“类型”:“字符串”,
“doc”:“字符串是一个 Unicode 字符序列。”
},
{
“名称”:“my_field4",
“类型”:{
“类型”:“长”,
“LogicalType”:“timestamp-millis”
},
“doc”:“用例”
}
]
}
在 Confluent Cloud 中创建主题,您可以使用以下命令以 Avro 格式将数据推送到该主题:
confluent kafka 话题生成 $TOPIC --schema ~/Dev/schema.txt\
--schema-registry-endpoint https://psrc-ab123.us-east-2.aws.confluent.cloud\
--schema-registry-api-key $API_KEY \
--schema-registry-api-secret $API_SECRET \
--value-for
例如,您可以逐行添加消息
{“my_field1": 1、“my_field2”: 3.4、“my_field3”: “你好”、“my_field4”: 1707954127790}
现在让我们在 Proton 中创建一个外部流来读取这样的消息:
CREATE EXTERNAL STREAM avro_stream(
my_field1 int8,
my_field2 float32,
my_field3 string,
my_field4 int64
)
SETTINGS
type = 'kafka',
brokers = 'pkc-ab123.us-east-2.aws.confluent.cloud:9092',
security_protocol='SASL_SSL',
username='$KEY',
password='$SECRET',
topic = '$TOPIC',
data_format = 'Avro',
kafka_schema_registry_url = 'https://psrc-ab123.us-east-2.aws.confluent.cloud',
kafka_schema_registry_credentials = '$API_KEY:$API_SECRET';
成功运行此 SQL 后,您可以通过以下方式获取现有数据
从 avro_stream 中选择 * 其中 _tp_time>earliest_ts ()
或者只通过以下方式获取收到的新消息
从 avro_stream 中选择 *
示例:在 Confluent 平台中读取 Avro 编码的数据
你可以关注 Confluent Docs 通过 Docker Compose 启动带有架构注册表的 Confluent 平台。
Avro 架构定义:
{
“命名空间”:“io.confluent.examples.clients.basicavro”,
“类型”:“记录”,
“名称”:“付款”,
“字段”:[
{“名称”:“id”,“类型”:“字符串”},
{“名称”:“金额”,“类型”:“双倍”}
]
按照 架构注册表教程 创建新主题 “交易”。 使用以下内容创建一个$HOME/.confluent/java.config
bootstrap.servers=localhost: 9092
client.dns.lookup=use_all_dns_ips
session.timeout.ms=45000
acks=all
schema.registry.url = http://localhost:8081
然后使用 Maven 编译 示例代码,并使用架构注册表向本地 Kafka 服务器生成 AVRO 编码的消息:
mvn 干净编译包
mvn exec: java-dexec.mainclass=io.confluent.examples.clients.basicavro.producerExample\
-dexec.args=”$HOME/.confluent/java.config”
然后在 Proton 中创建外部蒸汽:
创建外部流交易(
id 字符串,
金额翻倍
)
设置
type = 'kafka',
经纪商 = 'localhost: 9092',
主题 = “交易”,
data_format = 'Avro',
kafka_schema_registry_url = 'http://localhost:8081';
成功运行此 SQL 后,您可以通过以下方式获取现有数据
从交易中选择 * 其中 _tp_time>earliest_ts ()
或者只通过以下方式获取收到的新消息
从交易中选择 *
示例:在 Aiven 上读取 Kafka 服务中的 Avro 编码数据
Aiven 上的架构注册表端点是使用 CA 签名的,但你需要为代理提供 ssl_ca_cert_file
。
CREATE EXTERNAL STREAM transactions(
id string,
amount double
)
SETTINGS type='kafka',
brokers='name.a.aivencloud.com:28864',
topic='transactions',
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
username='avnadmin',
password='PASSWORD',
ssl_ca_cert_file='/kafka.cert',
data_format = 'Avro',
kafka_schema_registry_url = 'https://name.a.aivencloud.com:28856',
kafka_schema_registry_credentials = 'avnadmin:PASSWORD'