查询语法
Timeplus引入了几个SQL扩展来支持流式处理。 总的语法如下:
[WITH common_table_expression ..]
SELECT <expr, columns, aggr>
FROM <streaming_window_function>(<table_name>, [<time_column>], [<window_size>], ...)
[WHERE clause]
[GROUP BY clause]
EMIT <window_emit_policy>
SETTINGS <key1>=<value1>, <key2>=<value2>, ...
[WHERE clause]
[GROUP BY clause]
EMIT <window_emit_policy>
SETTINGS <key1>=<value1>, <key2>=<value2>, ...
[WHERE clause]
[GROUP BY clause]
[PARTITION BY clause]
EMIT <window_emit_policy>
SETTINGS <key1>=<value1>, <key2>=<value2>, ...
[JOIN clause]
[WHERE clause]
[GROUP BY clause]
[HAVING expression]
[PARTITION BY clause]
[LIMIT n]
[EMIT emit_policy]
[SETTINGS <key1>=<value1>, <key2>=<value2>, ...]
Only SELECT
and FROM
clauses are required (you can even omit FORM
, such as SELECT now()
, but it's less practical). Other clauses in [..]
are optional. We will talk about them one by one in the reverse order, i.e. SETTINGS, then EMIT, LIMIT, etc.
SQL keywords and function names are case-insensitive, while the column names and stream names are case-sensitive.
Streaming First Query Behavior
Before we look into the details of the query syntax, we'd like to highlight the default query behavior in Timeplus Proton is in the streaming mode, i.e.
SELECT .. FROM stream
will query the future events. Once you run the query, it will process new events. For example, if there are 1,000 events in the stream already, runningSELECT count() FROM stream
could return 0, if there is more new events.SELECT .. FROM table(stream)
will query the historical data, just like many of other databases. In the above sample stream, if you runSELECT count() FROM table(stream)
, you will get 1000 as the result and the query completed.
Query Settings
时间插件支持一些高级设置
来微调下列流式查询处理行为:
enable_backfill_from_historical_store=0|1
. By default, if it's omitted, it's1
. By default, if it's omitted, it's1
.- 当它为0时,查询引擎要么从流存储中加载数据,要么从历史存储中加载数据。
- 当它为1时,查询引擎会评估是否需要从历史存储中加载数据(例如时间范围在流式存储空间之外),或者从历史存储中获取数据的效率会更高(例如,count/min/max 是在历史存储中预先计算的,比在流式存储中扫描数据更快)。
force_backfill_in_order=0|1
. By default, if it's omitted, it's0
.- When it's 0, the data from the historical storage are turned without extra sorting. This would improve the performance. This would improve the performance.
- When it's 1, the data from the historical storage are turned with extra sorting. This would decrease the performance. So turn on this flag carefully. This would decrease the performance. So turn on this flag carefully.
emit_during_backfill=0|1
. By default, if it's omitted, it's0
.- When it's 0, the query engine won't emit intermediate aggregation results during the historical data backfill.
- When it's 1, the query engine will emit intermediate aggregation results during the historical data backfill. This will ignore the
force_backfill_in_order
setting. As long as there are aggregation functions and time window functions(e.g. tumble/hop/session) in the streaming SQL, when theemit_during_backfill
is on,force_backfill_in_order
will be applied to 1 automatically.
query_mode=<table|streaming>
默认情况下,如果省略,则为streaming
。 默认情况下,如果省略,则为streaming
。 一种常规设置,用于决定整体查询是流数据处理还是历史数据处理。 This can be overwritten in the port. This can be overwritten in the port. If you use 3128, default is streaming. If you use 8123, default is historical. If you use 8123, default is historical.recovery_policy=<strict|best_effort>
. By default, if it's omitted, it'sstrict
. The main use case for materialized views, if new events fail to process, such as converting a string to a int32, the default behavior will make the materialized view unusable. You may monitor the Timeplus logs to act on the dirty data. However, if you setSETTINGS recovery_policy=best_effort
, then Timeplus will attempt to recover from checkpoint and try up to 3 times, then skip dirty data and continue processing the rest of the data.seek_to=<timestamp|earliest|latest>
. 默认情况下,如果省略,则为latest
。 默认情况下,如果省略,则为latest
。 设置告诉Timeplus通过时间戳在流存储中 查找旧数据。 它可以是相对的时间戳或绝对的时间戳。 默认情况下,是latest
,表示了Timeplus不寻找旧数据。 例如:seek_to='2022-01-12 06:00:00.000'
,seek_to='-2h'
, 或seek_to='earliest'
Please note, as of Jan 2023, we no longer recommend you use SETTINGS seek_to=..
(except for External Stream). 请使用WHERE _tp_time>='2023-01-01'
或其他类似的。 请使用WHERE _tp_time>='2023-01-01'
或其他类似的。 _tp_time
is the special timestamp column in each raw stream to represent the event time. 您可以使用 >
, <
, BETWEEN... AND
operations to filter the data in Timeplus storage. 唯一的例外是外部流。 您可以使用 >
, <
, BETWEEN... AND
operations to filter the data in Timeplus storage. 唯一的例外是外部流。 If you need to scan all existing data in the Kafka topic, you need to run the SQL with seek_to, e.g. select raw from my_ext_stream settings seek_to='earliest'
EMIT
As an advanced feature, Timeplus Proton support various policies to emit results during streaming query.
The syntax is:
EMIT
[AFTER WATERMARK [WITH DELAY <interval>]
[PERIODIC <interval>]
[ON UPDATE]
- [[ AND ]TIMEOUT <interval>]
- [[ AND ]LAST <interval> [ON PROCTIME]]
Please note some policies are added in Proton 1.5 and incompatible with 1.4 or earlier version.
EMIT AFTER WATERMARK
You can omit EMIT AFTER WATERMARK
, since this is the default behavior for time window aggregations. 例如:
SELECT device, max(cpu_usage)
FROM tumble(device_utils, 5s)
GROUP BY device, window_end
The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream devices_utils
. Every time a window is closed, Timeplus Proton emits the aggregation results. How to determine the window should be closed? This is done by Watermark, which is an internal timestamp. 保证每个流量查询都能增加单一流量。
EMIT AFTER WATERMARK WITH DELAY
Before Proton 1.5, the syntax was EMIT AFTER WATERMARK AND DELAY
. Since Proton 1.5, we use WITH DELAY
instead of AND DELAY
, in order to make AND
as the keyword to combine multiple emit polices.
示例:
SELECT device, max(cpu_usage)
FROM tumble(device_utils, 5s)
GROUP BY device, widnow_end
EMIT AFTER WATERMARK DELAY 2s;
上面的示例 SQL 持续聚合每个设备对表 设备 _utils
的最大cpu 使用量。 Every time a window is closed, Timeplus waits for another 2 seconds and then emits the aggregation results.
EMIT PERIODIC
PERIODIC <n><UNIT>
tells Proton to emit the aggregation periodically. UNIT
可以是 ms(毫秒)、s(秒)、m(分钟)、h(小时)、d(天)。<n>
应为大于 0 的整数。
示例:
SELECT device, count(*)
FROM device_utils
WHERE cpu_usage > 99
EMIT PERIODIC 5s
For Global Streaming Aggregation the default periodic emit interval is 2s
, i.e. 2 seconds.
Since Proton 1.5, you can also apply EMIT PERIODIC
in time windows, such as tumble/hop/session.
When you run a tumble window aggregation, by default Proton will emit results when the window is closed. So tumble(stream,5s)
will emit results every 5 seconds, unless there is no event in the window to progress the watermark.
In some cases, you may want to get aggregation results even the window is not closed, so that you can get timely alerts. For example, the following SQL will run a 5-second tumble window and every 1 second, if the number of event is over 300, a row will be emitted.
SELECT <column_name1>, <column_name2>, <aggr_function>
FROM <table_name>
[WHERE clause]
GROUP BY ...
EMIT LAST INTERVAL <n> <UNIT>
SETTINGS max_keep_windows=<window_count>
EMIT LAST INTERVAL <n> <UNIT>
SETTINGS max_keep_windows=<window_count>
EMIT ON UPDATE
This is a new emit policy added in Proton 1.5.
Since Proton 1.5, you can apply EMIT ON UPDATE
in time windows, such as tumble/hop/session, with GROUP BY
keys. 例如:
SELECT
window_start, cid, count() AS cnt
FROM
tumble(car_live_data, 5s)
WHERE
cid IN ('c00033', 'c00022')
GROUP BY
window_start, cid
EMIT ON UPDATE
During the 5 second tumble window, even the window is not closed, as long as the aggregation value(cnt
) for the same cid
is different , the results will be emitted.
EMIT PERIODIC .. ON UPDATE
This is a new emit policy added in Proton 1.5.
You can combine EMIT PERIODIC
and EMIT ON UPDATE
together. In this case, even the window is not closed, Proton will check the intermediate aggregation result at the specified interval and emit rows if the result is changed.
EMIT TIMEOUT
For time window based aggregations, when the window is closed is decided by the watermark. A new event outside the window will progress the watermark and inform the query engine to close the previous window and to emit aggregation results.
Say you only get one event for the time window. Since there is no more event, the watermark cannot be moved so the window won't be closed.
EMIT TIMEOUT
is to force the window close, with a timeout after seeing last event.
Please note, if there no single event in the data stream, or in the time window, Proton won't emit result. For example, in the following SQL, you won't get 0 as the count:
SELECT window_start, count() as count FROM tumble(stream,2s)
GROUP BY window_start
Even you add EMIT TIMEOUT
in the SQL, it won't trigger timeout, because the query engine doesn't see any event in the window. If you need to detect such missing event for certain time window, one workaround is to create a heartbeat stream and use UNION
to create a subquery to combine both heartbeat stream and target stream, for a time window, if all observed events are from heartbeat stream, this means there is no event in the target stream. Please discuss more with us in community slack.
EMIT LAST
在流处理中,有一个典型的查询正在处理过去 X 秒/分钟/小时的数据。 例如,在过去 1 小时内显示每台设备的 cpu 使用量。 我们称这种类型的处理 最后X 流处理
Timeplus和Timeplus提供专门的 SQL 扩展以便于使用: EMIT LAST <n><UNIT>
与流式查询的其他部分一样,用户可以在这里使用间隔快捷键。 与流式查询的其他部分一样,用户可以在这里使用间隔快捷键。
By default, EMIT LAST
uses the event time. Timeplus Proton will seek both streaming storage and historical to backfill data in last X time range. EMIT LAST .. ON PROCTIME
uses the wall clock time to do the seek.
EMIT LAST for Streaming Tail
正在修改事件时间戳处于最后X范围内的事件。
子查询
SELECT *
FROM device_utils
WHERE cpu_usage > 80
EMIT LAST 5m
上面的示例过滤器事件在 device_utils
表中,其中 cpu_usage
大于80%,事件在过去 5 分钟内被添加。 在内部,Timeplus寻求流式存储回到5分钟(从现在起全时时间)并从那里压缩数据。
EMIT LAST for Global Aggregation
SELECT <column_name1>, <column_name2>, ...
FROM <table_name>
WHERE <clause>
EMIT LAST INTERVAL <n> <UNIT>;
SELECT <column_name1>, <column_name2>, ...
FROM <table_name>
WHERE <clause>
EMIT LAST INTERVAL <n> <UNIT>;
FROM <table_name>
WHERE <clause>
EMIT LAST INTERVAL <n> <UNIT>;
EMIT LAST INTERVAL <n> <UNIT>
SETTINGS max_keep_windows=<window_count>
注意 内部Timeplus片段数据流到小窗口,并在每个小窗口和时间结束时进行聚合, 它滑出旧的小窗口,以保持整个时间窗口的固定并保持递增聚合的效率。 默认情况下,最大保留窗口是 100。 如果最后的 X 间隔非常大且周期性的发射间隔较小。 然后用户将需要明确设置一个较大的最大窗口: last_x_interval / period_emit_interval
。
示例:
SELECT device, max(cpu_usage)
FROM tumble(devices, now64(3, 'UTC'), 5s)
GROUP BY device, window_end
EMIT AFTER WATERMARK DELAY 2s;
EMIT LAST for Windowed Aggregation
SELECT <column_name1>, <column_name2>, <aggr_function>
FROM <streaming_window_function>(<stream_name>, [<time_column>], [<window_size>], ...)
群组由...
SELECT <column_name1>, <column_name2>, <aggr_function>
FROM <table_name>
[WHERE clause]
GROUP BY ...
EMIT LAST INTERVAL <n> <UNIT>
SETTINGS max_keep_windows=<window_count>
EMIT LAST INTERVAL <n> <UNIT>
SETTINGS max_keep_windows=<window_count>