跳转至主要内容

变更日志流

当您使用 changelog_kv 的模式创建一个流时,流中的数据不再是附加的。 当您直接查询流时,仅显示相同主键的最新版本。 数据可以更新或删除。 您可以在左侧或右侧的 JOIN 中使用更新日志流。 Timeplus 将自动选择最新版本。

告警

For Timeplus Enterprise customers, we recommend to use Mutable Streams with the enhanced performance for UPSERT and queries. The changelog streams are not supported in Timeplus Enterprise, and will be removed in Timeplus Proton.

以下是一些例子:

创建流

在此示例中,您在 changelog_kv 模式中创建了一个带有以下列的流 dim_products

列名数据类型描述
_tp_timedatetime64(3,'UTC')它是为所有在 Timeplus 中的流自动创建的,并且具有毫秒精度和UTC时区的事件时间
_tp_delta整数一个特殊列,1 表示新数据,-1 表示已删除的数据
产品名称字符串产品的唯一 ID,作为主键
价格浮点数当前价格
注意

此页面的其余部分均假设您正在使用 TimePlus 控制台。 如果您使用的是 Proton,则可以使用 DDL 创建流。 Learn more

查询单个流

如果您没有添加任何数据,查询 SELECT * FROM dim_products 将不返回任何结果并继续等待新的结果。

添加数据

保持查询运行并将更多的行添加到流中(通过 REST API 或创建新的浏览器标签页并直接将行添加到流)。

_tp_delta产品名称价格
1iPhone14799
1iPhone14_Plus899

查询控制台将自动显示这两行。

删除数据

如果您不想再列出 iPhone14_Plus。 您需要添加一行为 _tp_delta=-1

_tp_delta产品名称价格
-1iPhone14_Plus899

然后取消查询并再次运行它,您只会得到 1 行,而不是 3 行。 原因是第 2 行和第 3 行具有相同的主要 ID,但有相反的 _tp_delta,所以 TimePlus 会将它们合并。 这一过程被称为“压缩”。

_tp_delta产品名称价格
1iPhone14799

更新数据

现在,如果您想要更改 iPhone14 的价格,您需要添加两行:

_tp_delta产品名称价格
-1iPhone14799
1iPhone14800

取消查询 SELECT * FROM dim_products 并再次运行,您将只会在产品列表中获得 1 行:

_tp_delta产品名称价格
1iPhone14800

正如您想象的,您可以继续添加新的行。 如果 _tp_delta 是 1 并且主键是新的,那么您将在查询结果中获得一个新的行。 如果 _tp_delta 是 -1 并且主键已经存在,那么前一行将被删除。 您可以通过添加带有主键的新行来更新该值。

注意

事实上,您可以指定一个表达式作为主键。 例如,您可以使用 first_name|' '||last_name 来合并全名作为主键,而不是使用单列。

显示聚合结果

如果您运行 select count(1), sum(price) from dim_products 这样的查询,此流式 SQL 将始终给您提供最新的结果:

计数(1)总和(价格)
1800当只有 1 行时:iPhone14
21699当 iPhone14_Plus 被添加
1800当 iPhone14_Plus 被移除

在 JOIN 中使用变更日志流作为查询

在上述示例中,您总是获得具有相同主键的事件的最新版本。 当这样的流充当 JOIN 的“查询表”时,这非常有用。

想象您有一个 订单 附加流:

_tp_time订单编号产品名称数量

当前 dim_product 流是:

_tp_delta产品名称价格
1iPhone14799
1iPhone14_Plus899

现在运行流式 SQL:

SELECT orders._tp_time, order_id,product_id,quantity, price*quantity AS amount
FROM orders JOIN dim_products USING(product_id)

然后添加两行:

_tp_time订单编号产品名称数量
2023-04-20T10:00:00.000Z1iPhone141
2023-04-20T10:01:00.000Z2iPhone14_Plus1

在查询控制台中,您将逐一看到这两行:

_tp_time订单编号产品名称数量金额
2023-04-20T10:00:00.000Z1iPhone141799
2023-04-20T10:01:00.000Z2iPhone14_Plus1899

然后,您可以通过在 dim_products 中添加新的两行将 iPhone14 的价格更改至 800。

_tp_delta产品名称价格
-1iPhone14799
1iPhone14800

也在 订单 中添加新的行。

_tp_time订单编号产品名称数量
2023-04-20T11:00:00.000Z3iPhone141

您将在前一个流式 SQL 中获得第三行:

_tp_time订单编号产品名称数量金额
2023-04-20T10:00:00.000Z1iPhone141799
2023-04-20T10:01:00.000Z2iPhone14_Plus1899
2023-04-20T11:00:00.000Z3iPhone141800

可以看出,iPhone14 的最新价格被应用到新事件的 JOIN 中。

在 JOIN 中使用更新日志流作为左表

您也可以在 JOIN 左侧使用变更日志流。

让我们在变更日志流模式下创建一个新流 订单2

_tp_time_tp_delta订单编号产品名称数量

您可以通过添加具有适当 _tp_delta 值的行来添加/更新/删除订单。 当您运行流式 SQL 时:

SELECT orders2._tp_time, order_id,product_id,quantity, price*quantity AS amount
FROM orders2 JOIN dim_products USING(product_id)

Timeplus 将使用最新版本的订单记录加入查询表。

更有用的是,如果您运行聚合,比如:

SELECT count(1) AS order_count, sum(price*quantity) AS revenue
FROM orders2 JOIN dim_products USING(product_id)

每当订单被添加、更新或删除时,您将获得正确的数字。

使用变更日志流设置 CDC

CDC(更改数据捕获),是现代数据存储的一个关键部分。 大多数现代数据库都支持 CDC 实时同步数据变化到其他系统。 一个受欢迎的开源解决方案是 Debezium

Timeplus 的变更日志流可以与 Debezium 或其他 CDC 解决方案配合使用。 如果您的应用程序可以使用适当的 _tp_delta 标志生成事件(1 表示添加数据,-1 表示删除数据),那么没有它们也可以正常运行。

例如,您在 PostgreSQL 14 中创建了两个表格:

设置 PostgreSQL 表

CREATE TABLE dim_products(
product_id VARCHAR PRIMARY KEY,
price FLOAT
);
CREATE TABLE orders(
order_id serial PRIMARY KEY,
product_id varchar,
quantity int8 DEFAULT 1,
timestamp timestamp DEFAULT NOW()
);

为了更新或删除来抓取 之前 的值,您还需要将 REPLICA IDENTIFY 设置成 FULL 为了更新或删除来抓取 之前 的值,您还需要将 REPLICA IDENTIFY 设置成 FULL 查看 Debezium 文档 了解更多详情。

ALTER TABLE dim_products REPLICA IDENTITY FULL;
ALTER TABLE orders REPLICA IDENTITY FULL;

设置 Debezium

现在以您喜欢的方式开始使用 Kafka Connect 的 Debezium 。 您还需要本地或远程的 Kafka/Redpanda 作为消息代理来接收 CDC 数据。

在此示例中,我们将使用 Redpanda Cloud 作为消息代理,并使用内置 Debezium 的 Kafka Connect 的本地 docker 镜像。 docker-compose 文件:

version: "3.7"
services:
connect:
image: quay.io/debezium/connect:2.2.0.Final
container_name: connect
ports:
- 8083:8083
environment:
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}
# CONNECT_ properties are for the Connect worker
- CONNECT_BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}
- CONNECT_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
- CONNECT_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
- CONNECT_SASL_MECHANISM=SCRAM-SHA-512
- CONNECT_PRODUCER_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
- CONNECT_PRODUCER_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
- CONNECT_PRODUCER_SASL_MECHANISM=SCRAM-SHA-512
- CONNECT_CONSUMER_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
- CONNECT_CONSUMER_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
- CONNECT_CONSUMER_SASL_MECHANISM=SCRAM-SHA-512
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false

您可以将凭据放置在同一文件夹的 .env 文件中:

BOOTSTRAP_SERVERS=demourl.cloud.redpanda.com:9092
SASL_USERNAME=yourname
SASL_PASSWORD=yourpassword
SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required username="${SASL_USERNAME}" password="${SASL_PASSWORD}";
SECURITY_PROTOCOL=SASL_SSL

或者,您可以在 docker 撰写文件中添加 Redpanda 控制台。 它为您添加/查看主题和管理连接器提供了很好的用户界面。

  console:
image: docker.redpanda.com/redpandadata/console:v2.2.3
container_name: redpanda_console
depends_on:
- connect
ports:
- 8080:8080
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ${BOOTSTRAP_SERVERS}
sasl:
enabled: true
username: ${SASL_USERNAME}
password: ${SASL_PASSWORD}
mechanism: SCRAM-SHA-512
tls:
enabled: true
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083

添加 PostgreSQL 连接器

使用 docker-compose up启动 docker-compose up。 它将加载 Debezium 和 Redpanda 控制台的图像。 您可以通过 http://localhost:8080 访问 Redpanda 控制台,Kafka Connect REST API 的端点是 http://localhost:8083

您可以通过 Web 用户界面添加 Debezium 连接器。 但以下命令行可以正常运行:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors \
-d '{"name":"pg-connector","config":{"connector.class": "io.debezium.connector.postgresql.PostgresConnector","publication.autocreate.mode": "filtered", "database.dbname": "defaultdb", "database.user": "avnadmin", "schema.include.list": "public", "database.port": "28851", "plugin.name": "pgoutput", "database.sslmode": "require", "topic.prefix": "doc", "database.hostname": "xyz.aivencloud.com", "database.password": "***", "table.include.list": "public.dim_products,public.orders"}}'

这将添加一个名为 pg-connector 的新连接器,并连接到远程服务器上的 PostgreSQL 数据库(在这里是 Aiven Cloud)。 配置项目的一些注意事项:

  • publication.autocreate.mode筛选后的table.include.list 是您想要应用 CDC 的一系列列表。 有时,您没有权限为所有表启用发布。 因此,建议使用 筛选后的
  • plugin.namepgoutput。 这与新版本的 PostgreSQL(10+)配合使用效果很好。 默认值为 decoderbufs,它需要单独安装。
  • topic.prefix 已设置为 doc。 顾名思义,它将成为 Kafka 主题的前缀。 由于架构是 公开的,使用的主题将是 doc.public.dim_productsdoc.public.orders

确保您在 Kafka/Redpanda 中创建了这两个主题。 然后在几秒钟内,主题中应该有新的消息。

您可以在 PostgreSQL 中尝试 插入/更新/删除 数据并检查生成的 JSON 消息。

示例 CDC 数据

插入

SQL:

INSERT INTO dim_products ("product_id", "price") VALUES ('iPhone14', 799)

CDC Data:

{
"before": null,
"after": {
"product_id": "iPhone14",
"price": 799
},
"source": {
},
"op": "c",
"ts_ms": 1682217357439,
"transaction": null
}

更新

SQL:

UPDATE dim_products set price=800 WHERE product_id='iPhone14'

CDC Data:

{
"before": {
"product_id": "iPhone14",
"price": 799
},
"after": {
"product_id": "iPhone14",
"price": 800
},
"source": {
},
"op": "u",
"ts_ms": 1682217416358,
"transaction": null
}

删除

SQL:

DELETE FROM dim_products

CDC Data:

{
"before": {
"product_id": "iPhone14",
"price": 800
},
"after": null,
"source": {
},
"op": "d",
"ts_ms": 1682217291411,
"transaction": null
}

对于已经存在的行

Debezium 也会读取所有现有的行并生成这样的消息。

{
"before": null,
"after": {
"product_id": "iPhone14",
"price": 799
},
"source": {
},
"op": "r",
"ts_ms": 1682217357439,
"transaction": null
}

将数据加载到 Timeplus

You can follow this guide to add 2 data sources to load data from Kafka or Redpanda. 例如:

  • 数据源名称 s1 从主题 doc.public.dim_products 中加载数据并放入新流 rawcdc_dim_products
  • 数据源名称 s2 从主题 doc.public.orders 中加载数据并放入新流 rawcdc_orders

然后在查询页中运行以下 SQL:

select 1::int8 as _tp_delta, after:product_id as product_id, after:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op in ('c','r')
union
select -1::int8 as _tp_delta, before:product_id as product_id, before:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op='d'
union
select -1::int8 as _tp_delta, before:product_id as product_id, before:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op='u'
union
select 1::int8 as _tp_delta, after:product_id as product_id, after:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op='u'

点击 作为数据下游发送 按钮,并选择 Timeplus 类型,将结果发送到现有的流 dim_products

注意

在即将推出的 Timeplus 版本中,我们将简化流程,这样您就无需编写自定义 SQL 来提取 Debezium CDC 消息。

相同地,这里是 SQL 将原生的 CDC 消息转换为 订单

select 1::int8 as _tp_delta, after:order_id as order_id, after:product_id as product_id, after:quantity::int8 as quantity, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_orders where op in ('c','r')
union
select -1::int8 as _tp_delta, before:order_id as order_id, before:product_id as product_id, before:quantity::int8 as quantity, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_orders where op='d'
union
select -1::int8 as _tp_delta, before:order_id as order_id, before:product_id as product_id, before:quantity::int8 as quantity, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_orders where op='u'
union
select 1::int8 as _tp_delta, after:order_id as order_id, after:product_id as product_id, after:quantity::int8 as quantity, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_orders where op='u'