跳转至主要内容

多版本流

当您使用 versioned_kv 的模式创建一个流时,流中的数据不再是附加的。 当您直接查询流时,仅显示相同主键的最新版本。 当您在与其他流的 JOIN 中将这个流用作 “右表” 时,Timeplus 会自动选择最接近的版本。

A HOWTO video:

查询单个流

在此示例中,您在 versioned_kv 模式中创建了一个带有以下列的流 dim_products

列名数据类型描述
_tp_timedatetime64(3,'UTC')它是为所有在 Timeplus 中的流自动创建的,并且具有毫秒精度和UTC时区的事件时间
产品名称字符串产品的唯一 ID,作为主键
价格float32当前价格

This stream can be created using UI wizard on Timeplus Cloud or Timeplus Enterprise. You can also create it with SQL in Timeplus Proton:

CREATE STREAM dim_products(product_id string, price float32) 
PRIMARY KEY (product_id)
SETTINGS mode='versioned_kv'

如果您没有添加任何数据,查询 SELECT * FROM dim_products 将不返回任何结果并继续等待新的结果。

现在取消此查询,再向流中添加几行。

INSERT INTO dim_products(product_id,price) VALUES ('iPhone15',799),('iPhone15_Plus',899);
产品名称价格
iPhone15799
iPhone15_Plus899

再次运行 SELECT * FROM dim_products 将获得这两行。

现在,如果您再添加一行:

产品名称价格
iPhone15800
INSERT INTO dim_products(product_id,price) VALUES ('iPhone15',800);

Then query SELECT * FROM dim_products again will get 2 rows (not 3, because the initial price of "iPhone15" is overwritten).

产品名称价格
iPhone15800
iPhone15_Plus899

正如您想象的,您可以继续添加新的行。 如果主键是新的,那么您将在查询结果中获得一个新的行。 如果主键已经存在,则前一行将被新添加行中的值覆盖。

注意

事实上,您可以指定一个表达式作为主键。 例如,您可以使用 first_name|' '||last_name 来合并全名作为主键,而不是使用单列。 Or you can create a tuple as compound keys PRIMARY KEY (first_name,last_name)

You can also query the stream in the table mode, i.e. select * from table(dim_products)

在 INNER JOIN 中使用多版本流

在上述示例中,您总是获得具有相同主键的事件的最新版本。 其运行方式与 变更日志流 类似。 这种流模式之所以被称为多版本流,是因为 Timeplus 将跟踪多个版本。 这主要在多版本流充当 JOIN 的 “右表” 时使用。

Imagine you have the other versioned stream for the orders:

CREATE STREAM orders(order_id int8, product_id string, quantity int8)
PRIMARY KEY order_id
SETTINGS mode='versioned_kv';
_tp_time订单编号产品名称数量

现在运行流式SQL:

SELECT orders._tp_time, order_id,product_id,quantity, price*quantity AS revenue
FROM orders JOIN dim_products USING(product_id)

然后添加两行:

INSERT INTO orders(order_id, product_id, quantity) 
VALUES (1, 'iPhone15',1),(2, 'iPhone15_Plus',2);
_tp_time订单编号产品名称数量
2023-04-20T10:00:00.000Z1iPhone151
2023-04-20T10:01:00.000Z2iPhone15_Plus1

在查询控制台中,您将逐一看到这两行:

_tp_time订单编号产品名称数量revenue
2023-04-20T10:00:00.000Z1iPhone151800
2023-04-20T10:01:00.000Z2iPhone15_Plus1899

Then you can change the price of iPhone15 back to 799, by adding a new row in dim_products

产品名称价格
iPhone15799

也在 订单 中添加新的一行

_tp_time订单编号产品名称数量
2023-04-20T11:00:00.000Z3iPhone151

您将在前一个流式 SQL 中获得第三行:

_tp_time订单编号产品名称数量revenue
2023-04-20T10:00:00.000Z1iPhone151800
2023-04-20T10:01:00.000Z2iPhone15_Plus1899
2023-04-20T11:00:00.000Z3iPhone151799

It shows that the latest price of iPhone15 is applied to the JOIN of new events.

您也可以运行一个流式 SQL select sum(price) from dim_products,它应显示1698,因为最新的价格是799和899。

If you add a new row to set iPhone15 to 800, cancel the previous query and run again, you will get 1699.

Use Versioned Stream in LEFT JOIN

Since Proton 1.5.7, LEFT JOIN 2 versioned streams are also supported.

For example, you run a streaming SQL:

SELECT orders._tp_time, order_id,product_id,
quantity, price*quantity AS revenue
FROM dim_products LEFT JOIN orders USING(product_id);

Then add a new product:

INSERT INTO dim_products(product_id,price) VALUES ('Vision Pro',3000);

Because there is no order for this product, you will get revenue 0 with the LEFT JOIN (if you are using INNER JOIN or just JOIN, this new product won't be counted).

Adding a new order:

INSERT INTO orders(order_id, product_id, quantity)
VALUES (4, 'Vision Pro',1);

The LEFT JOIN SQL will updated the result.

在 ASOF JOIN 中使用多版本流

多版本流的最佳部分是在 ASOF JOIN 中 Timeplus 能够自动选择最接近的版本。

继续前面的场景。

SELECT orders._tp_time, order_id,product_id,quantity, price*quantity AS revenue
FROM orders ASOF JOIN dim_products
ON orders.product_id=dim_products.product_id AND orders._tp_time >= dim_products._tp_time

If the current iPhone15 price is 800, and you add a new order for 1 iPhone15, then you will get a transaction amount of 800.

Then you change the iPhone15 price to 799, and add a new order for 1 iPhone15, you will get a transaction amount of 799.

但是,如果您在价格变动之前使用 _tp_time 添加订单,则交易金额将再次变为 800,因为 Timeplus 保留了多个版本的价格,并选择了与订单时间最匹配的旧版本。

注意

如果你不熟悉 ASOF JOIN ,这个特殊的 JOIN 可以提供非精确匹配功能。 如果两个流具有相同的id,但时间戳不完全相同,这也可以很好的运作。

The advanced keep_versions setting:

In the above example, you can add settings keep_versions=3 at the end of the query. This will inform the query engine to read the base version from the historical storage for the versioned_kv stream, then read the new events in the streaming storage and based on the ASOF time condition to pick up 3 versions in the memory, and finally join events from left append_stream with the right 3 versions, and find the best candidate to join together.

Retention Policy

You should not set the TTL(Time-To-Live) for the historical storage of versioned_kv stream. Because only the last version of the same primary key is kept in the historical storage (via an auto-compact background process). Manually set a TTL may remove those events who haven't been updated recently.

You may set a time-based or size-based retention policy for the streaming storage for the versioned_kv stream. But this is optional. By default a 4GB segment file is used for the streaming storage.