变更日志流
当您使用 changelog_kv
的模式创建一个流时,流中的数据不再是附加的。 当您直接查询流时,仅显示相同主键的最新版本。 数据可以更新或删除。 您可以在左侧或右侧的 JOIN 中使用更新日志流。 Timeplus 将自动选择最新版本。
We recently introduced a new table function changelog to dynamically create a changelog stream based on an append-only stream by specifying the primary key column(s) and the version column, or create a changelog stream based on a versioned stream. 它专为高级用例而设计,例如按主键处理迟到事件。 详细信息请检查 更新日志 的文档。
以下是一些例子:
创建流
在此示例中,您在 changelog_kv
模式中创建了一个带有以下列的流 dim_products
:
列名 | 数据类型 | 描述 |
---|---|---|
_tp_time | datetime64(3,'UTC') | 它是为所有在 Timeplus 中的流自动创建的,并且具有毫秒精度和UTC时区的事件时间 |
_tp_delta | 整数 | 一个特殊列,1 表示新数据,-1 表示已删除的数据 |
产品名称 | 字符串 | 产品的唯一 ID,作为主键 |
价格 | 浮点数 | 当前价格 |
此页面的其余部分均假设您正在使用 TimePlus 控制台。 如果您使用的是 Proton,则可以使用 DDL 创建流。 点击此处,了解更多
查询单个流
如果您没有添加任何数据,查询 SELECT * FROM dim_products
将不返回任何结果并继续等待新的结果。
添加数据
保持查询运行并将更多的行添加到流中(通过 REST API 或创建新的浏览器标签页并直接将行添加到流)。
_tp_delta | 产品名称 | 价格 |
---|---|---|
1 | iPhone14 | 799 |
1 | iPhone14_Plus | 899 |
查询控制台将自动显示这两行。
删除数据
如果您不想再列出 iPhone14_Plus。 您需要添加一行为 _tp_delta=-1
:
_tp_delta | 产品名称 | 价格 |
---|---|---|
-1 | iPhone14_Plus | 899 |
然后取消查询并再次运行它,您只会得到 1 行,而不是 3 行。 原因是第 2 行和第 3 行具有相同的主要 ID,但有相反的 _tp_delta,所以 TimePlus 会将它们合并。 这一过程被称为“压缩”。
_tp_delta | 产品名称 | 价格 |
---|---|---|
1 | iPhone14 | 799 |
更新数据
现在,如果您想要更改 iPhone14 的价格,您需要添加两行:
_tp_delta | 产品名称 | 价格 |
---|---|---|
-1 | iPhone14 | 799 |
1 | iPhone14 | 800 |
取消查询 SELECT * FROM dim_products
并再次运行,您将只会在产品列表中获得 1 行:
_tp_delta | 产品名称 | 价格 |
---|---|---|
1 | iPhone14 | 800 |
正如您想象的,您可以继续添加新的行。 如果 _tp_delta 是 1 并且主键是新的,那么您将在查询结果中获得一个新的行。 如果 _tp_delta 是 -1 并且主键已经存在,那么前一行将被删除。 您可以通过添加带有主键的新行来更新该值。
事实上,您可以指定一个表达式作为主键。 例如,您可以使用 first_name|' '||last_name
来合并全名作为主键,而不是使用单列。
显示聚合结果
如果您运行 select count(1), sum(price) from dim_products
这样的查询,此流式 SQL 将始终给您提供最新的结果:
计数(1) | 总和(价格) | |
---|---|---|
1 | 800 | 当只有 1 行时:iPhone14 |
2 | 1699 | 当 iPhone14_Plus 被添加 |
1 | 800 | 当 iPhone14_Plus 被移除 |
在 JOIN 中使用变更日志流作为查询
在上述示例中,您总是获得具有相同主键的事件的最新版本。 当这样的流充当 JOIN 的“查询表”时,这非常有用。
想象您有一个 订单
附加流:
_tp_time | 订单编号 | 产品名称 | 数量 |
---|---|---|---|
当前 dim_product
流是:
_tp_delta | 产品名称 | 价格 |
---|---|---|
1 | iPhone14 | 799 |
1 | iPhone14_Plus | 899 |
现在运行流式 SQL:
SELECT orders._tp_time, order_id,product_id,quantity, price*quantity AS amount
FROM orders JOIN dim_products USING(product_id)
然后添加两行:
_tp_time | 订单编号 | 产品名称 | 数量 |
---|---|---|---|
2023-04-20T10:00:00.000Z | 1 | iPhone14 | 1 |
2023-04-20T10:01:00.000Z | 2 | iPhone14_Plus | 1 |
在查询控制台中,您将逐一看到这两行:
_tp_time | 订单编号 | 产品名称 | 数量 | 金额 |
---|---|---|---|---|
2023-04-20T10:00:00.000Z | 1 | iPhone14 | 1 | 799 |
2023-04-20T10:01:00.000Z | 2 | iPhone14_Plus | 1 | 899 |
然后,您可以通过在 dim_products
中添加新的两行将 iPhone14 的价格更改至 800。
_tp_delta | 产品名称 | 价格 |
---|---|---|
-1 | iPhone14 | 799 |
1 | iPhone14 | 800 |
也在 订单
中添加新的行。
_tp_time | 订单编号 | 产品名称 | 数量 |
---|---|---|---|
2023-04-20T11:00:00.000Z | 3 | iPhone14 | 1 |
您将在前一个流式 SQL 中获得第三行:
_tp_time | 订单编号 | 产品名称 | 数量 | 金额 |
---|---|---|---|---|
2023-04-20T10:00:00.000Z | 1 | iPhone14 | 1 | 799 |
2023-04-20T10:01:00.000Z | 2 | iPhone14_Plus | 1 | 899 |
2023-04-20T11:00:00.000Z | 3 | iPhone14 | 1 | 800 |
可以看出,iPhone14 的最新价格被应用到新事件的 JOIN 中。
在 JOIN 中使用更新日志流作为左表
您也可以在 JOIN 左侧使用变更日志流。
让我们在变更日志流模式下创建一个新流 订单2
:
_tp_time | _tp_delta | 订单编号 | 产品名称 | 数量 |
---|---|---|---|---|
您可以通过添加具有适当 _tp_delta 值的行来添加/更新/删除订单。 当您运行流式 SQL 时:
SELECT orders2._tp_time, order_id,product_id,quantity, price*quantity AS amount
FROM orders2 JOIN dim_products USING(product_id)
Timeplus 将使用最新版本的订单记录加入查询表。
更有用的是,如果您运行聚合,比如:
SELECT count(1) AS order_count, sum(price*quantity) AS revenue
FROM orders2 JOIN dim_products USING(product_id)
每当订单被添加、更新或删除时,您将获得正确的数字。
使用变更日志流设置 CDC
CDC(更改数据捕获),是现代数据存储的一个关键部分。 大多数现代数据库都支持 CDC 实时同步数据变化到其他系统。 一个受欢迎的开源解决方案是 Debezium。
Timeplus 的变更日志流可以与 Debezium 或其他 CDC 解决方案配合使用。 如果您的应用程序可以使用适当的 _tp_delta 标志生成事件(1 表示添加数据,-1 表示删除数据),那么没有它们也可以正常运行。
例如,您在 PostgreSQL 14 中创建了两个表格:
设置 PostgreSQL 表
CREATE TABLE dim_products(
product_id VARCHAR PRIMARY KEY,
price FLOAT
);
CREATE TABLE orders(
order_id serial PRIMARY KEY,
product_id varchar,
quantity int8 DEFAULT 1,
timestamp timestamp DEFAULT NOW()
);
为了更新或删除来抓取 之前
的值,您还需要将 REPLICA IDENTIFY
设置成 FULL
为了更新或删除来抓取 之前
的值,您还需要将 REPLICA IDENTIFY
设置成 FULL
查看 Debezium 文档 了解更多详情。
ALTER TABLE dim_products REPLICA IDENTITY FULL;
ALTER TABLE orders REPLICA IDENTITY FULL;
设置 Debezium
现在以您喜欢的方式开始使用 Kafka Connect 的 Debezium 。 您还需要本地或远程的 Kafka/Redpanda 作为消息代理来接收 CDC 数据。
在此示例中,我们将使用 Redpanda Cloud 作为消息代理,并使用内置 Debezium 的 Kafka Connect 的本地 docker 镜像。 docker-compose 文件:
version: "3.7"
services:
connect:
image: quay.io/debezium/connect:2.2.0.Final
container_name: connect
ports:
- 8083:8083
environment:
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}
# CONNECT_ properties are for the Connect worker
- CONNECT_BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}
- CONNECT_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
- CONNECT_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
- CONNECT_SASL_MECHANISM=SCRAM-SHA-512
- CONNECT_PRODUCER_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
- CONNECT_PRODUCER_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
- CONNECT_PRODUCER_SASL_MECHANISM=SCRAM-SHA-512
- CONNECT_CONSUMER_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
- CONNECT_CONSUMER_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
- CONNECT_CONSUMER_SASL_MECHANISM=SCRAM-SHA-512
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
您可以将凭据放置在同一文件夹的 .env
文件中:
BOOTSTRAP_SERVERS=demourl.cloud.redpanda.com:9092
SASL_USERNAME=yourname
SASL_PASSWORD=yourpassword
SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required username="${SASL_USERNAME}" password="${SASL_PASSWORD}";
SECURITY_PROTOCOL=SASL_SSL
或者,您可以在 docker 撰写文件中添加 Redpanda 控制台。 它为您添加/查看主题和管理连接器提供了很好的用户界面。
console:
image: docker.redpanda.com/redpandadata/console:v2.2.3
container_name: redpanda_console
depends_on:
- connect
ports:
- 8080:8080
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ${BOOTSTRAP_SERVERS}
sasl:
enabled: true
username: ${SASL_USERNAME}
password: ${SASL_PASSWORD}
mechanism: SCRAM-SHA-512
tls:
enabled: true
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083