跳转至主要内容

UPSERT with Versioned Stream

默认情况下,Timeplus中的流处于仅追加模式。 当你创建模式为 “versioned_kv” 的流时,你可以使用相同的主键更新数据。 当你使用 table函数查询流时,只会显示相同主键的最新版本。 When you use this stream as "right-table" in a streaming JOIN with other streams, Timeplus will automatically choose the closest version.

Versioned Stream and Mutable Stream support INSERT and UPDATE. To delete data with certain primary key(s), please create a Changelog Stream.

一段 HOWTO 视频:

查询单个流

在此示例中,您在 versioned_kv 模式下创建了一个包含以下列的流 dim_products

列名数据类型描述
\ _tp_timedatetime64(3,'UTC')它是为所有在 Timeplus 中的流自动创建的,并且具有毫秒精度和UTC时区的事件时间
产品名称字符串产品的唯一 ID,作为主键
价格float32当前价格

这个流可以在Timeplus Cloud或Timeplus Enterprise上使用用户界面向导创建。 你也可以在 Timeplus Proton 中使用 SQL 创建它:

创建 STREAM dim_products(product_id 字符串,价格浮动32
主键 (product_id)
设置模式='versioned_kv'

如果你不添加任何数据,查询 “SELECT * FROM dim_products” 将不返回任何结果并继续等待新的结果。

现在取消此查询,再向流中添加几行。

在 dim_products(商品编号、价格)中插入值('iPhone15',799),('iPhone15_Plus',899);
产品名称价格
iPhone15799
iPhone15_Plus899

再次运行 SELECT * FROM dim_products 将获得这 2 行。

现在,如果您再添加一行:

产品名称价格
iPhone15800
在 dim_products(商品编码、价格)中插入值('iPhone15',800);

然后再次查询 “SELECT * FROM dim_products” 将获得 2 行(不是 3 行,因为 “iPhone15” 的初始价格已被覆盖)。

产品名称价格
iPhone15800
iPhone15_Plus899

正如您想象的,您可以继续添加新的行。 如果主键是新的,那么您将在查询结果中获得一个新的行。 如果主键已经存在,则前一行将被新添加行中的值覆盖。

注意

事实上,您可以指定一个表达式作为主键。 例如,你可以使用 first_name||' '||last_name 来使用组合的全名作为主键,而不是使用单列。 或者你可以创建一个元组作为复合键 PRIMARY KEY (first_name, last_name)

你也可以在表格模式下查询流,即 “从表中选择 * (dim_products)”

在 INNER JOIN 中使用多版本流

在上述示例中,您总是获得具有相同主键的事件的最新版本。 This works in the similar way as Changelog Stream. 这种流模式之所以被称为多版本流,是因为 Timeplus 将跟踪多个版本。 这主要在多版本流充当 JOIN 的 “右表” 时使用。

想象一下,你有另一个版本控制的 “订单” 流:

创建 STREAM 订单(order_id int8、product_id 字符串、数量 int8)
主键 order_id
设置模式='versioned_kv'
\ _tp_time订单编号产品名称数量

现在运行流式SQL:

选择订单。_tp_time、订单编号、产品编号、数量、价格*数量作为收入
来自订单加入 dim_products 使用 (product_id)

然后添加两行:

在订单中插入(订单编号、商品编号、数量)
(1, 'iPhone15',1), (2, 'iPhone15_Plus',2)
\ _tp_time订单编号产品名称数量
2023-04-20T10:00:00.000Z1iPhone151
2023-04-20T10:01:00.000Z2iPhone15_Plus1

在查询控制台中,您将逐一看到这两行:

\ _tp_time订单编号产品名称数量收入
2023-04-20T10:00:00.000Z1iPhone151800
2023-04-20T10:01:00.000Z2iPhone15_Plus1899

然后你可以通过在 dim_products中添加新的一行将iPhone15的价格改回799美元

产品名称价格
iPhone15799

还要在 “订单” 中添加新行

\ _tp_time订单编号产品名称数量
2023-04-20T11:00:00.000Z3iPhone151

您将在前一个流式 SQL 中获得第三行:

\ _tp_time订单编号产品名称数量收入
2023-04-20T10:00:00.000Z1iPhone151800
2023-04-20T10:01:00.000Z2iPhone15_Plus1899
2023-04-20T11:00:00.000Z3iPhone151799

它表明,iPhone15的最新价格适用于新活动的加入。

你也可以运行串流 SQL “从 dim_products 中选择总和(价格)”,它应该显示数字 1698,因为最新的价格是 799 和 899。

如果你添加一个新行来将 iPhone15 设置为 800,取消之前的查询然后再次运行,你会得到 1699。

在 LEFT JOIN 中使用版本化流

自 Proton 1.5.7 版本起,还支持 LEFT JOIN 2 版本化流。

例如,你运行流式 SQL:

选择订单。_tp_time、order_id、product_id、
数量、价格*数量作为收入
来自 dim_products 向左加入订单使用 (product_id)

然后添加新产品:

在 dim_products(产品编号、价格)中插入值('Vision Pro',3000);

由于该产品没有订单,因此使用 “LEFT JOIN” 将获得0的收入(如果您使用的是 “INNER JOIN” 或仅使用 “JOIN”,则此新产品不计算在内)。

添加新订单:

在订单中插入(订单编号、商品编号、数量)
VALUES (4'Vision Pro',1)

LEFT JOIN SQL 将更新结果。

在 ASOF JOIN 中使用多版本流

Versioned Stream最棒的地方在于,在 “ASOF JOIN” 中,Timeplus 能够自动选择最接近的版本。

继续前面的场景。

选择订单。_tp_time、订单编号、产品编号、数量、价格*数量作为收入
来自订单 ASOF 在 orders.products.products.products.products.products 和订单上加入 dim_products.products
。_tp_time >= dim_products。_tp_time

如果当前iPhone15的价格为800美元,并且您添加了购买1部iPhone15的新订单,那么您将获得800美元的交易金额。

然后,你将iPhone15的价格更改为799美元,然后添加1部iPhone15的新订单,你将获得799美元的交易金额。

但是,如果您在价格变动之前添加带有\ _tp_time 的订单,您将再次获得交易金额为800,因为Timeplus会保留多个价格版本,并选择与订单时间最匹配的旧版本。

注意

如果你不熟悉 `ASOF JOIN',这个特殊的 JOIN 提供了非精确匹配功能。 如果两个流具有相同的id,但时间戳不完全相同,这也可以很好的运作。