Streaming Processing
The following functions are supported in streaming query, but not all of them support historical query. Please check the tag like this.
✅ streaming query
🚫 historical query
table
table(stream)
turns the unbounded data stream as a bounded table, and query its historical data. For example, you may load the clickstream data from a Kafka topic into the clicks
stream in Timeplus. By default, if you run SELECT .. FROM clicks ..
This is a streaming query with unbounded data. The query will keep sending you new results whenever it's available. If you only need to analyze the past data, you can put the stream into the table
function. Taking a count
as an example:
- running
select count(*) from clicks
will show latest count every 2 seconds and never ends, until the query is canceled by the user - running
select count(*) from table(clicks)
will return immediately with the row count for the historical data for this data stream.
You can create views such as create view histrical_view as select * from table(stream_name)
, if you want to query the data in the table mode more than once. This may work well for static data, such as lookup information(city names and their zip code).
Please note, the table
function also works in other types of streams:
- mutable streams: get the historical data of the mutable stream.
- Kafka external stream: read the existing data in the topic.
select count(*) from table(kafka_ext_stream)
is optimized to efficiently get the number of current message count from the Kafka topic. - Pulsar external stream: read the existing data in the topic.
- Timeplus external stream: read the existing data for a stream in a remote Timeplus.
- Random stream: generate a block of random data. The number of rows in the block is predefined and subject to change. The current value is 65409. For testing or demonstration purpose, you can create a random stream with multiple columns and use the table function to generate random data at once.
Learn more about Non-streaming queries.
tumble
tumble(stream [,timeCol], windowSize)
Create a tumble window view for the data stream, for example tumble(iot,5s)
will create windows for every 5 seconds for the data stream iot
. The SQL must end with group by
with either window_start
or window_end
or both.
✅ streaming query
✅ historical query
hop
hop(stream [,timeCol], step, windowSize)
Create a hopping window view for the data stream, for example hop(iot,1s,5s)
will create windows for every 5 seconds for the data stream iot
and move the windows forward every second. The SQL must end with group by
with either window_start
or window_end
or both.
✅ streaming query
🚫 historical query
session
session(stream [,timeCol], idle, [maxLength,] [startCondition,endCondition] )
Create dynamic windows based on the activities in the data stream.
Parameters:
stream
a data stream, a view, or a CTE/subquerytimeCol
optional, by default it will be_tp_time
(the event time for the record)idle
how long the events will be automatically split to 2 session windowsmaxLength
the max length of the session window. Optional. Default value is the 5 times ofidle
[startCondition, endCondition]
Optional. If specified, the session window will start when thestartCondition
is met and will close whenendCondition
is met. You can use[expression1, expression2]
to indicate start and end events will be included in the session, or(expression1, expression2]
to indicate the ending events will be included but not the starting events.
For example, if the car keeps sending data when it's moving and stops sending data when it's parked or waiting for the traffic light
session(car_live_data, 1m) partition by cid
will create session windows for each car with 1 minute idle time. Meaning if the car is not moved within one minute, the window will be closed and a new session window will be created for future events. If the car keeps moving for more than 5 minutes, different windows will be created (every 5 minutes), so that as analysts, you can get near real-time results, without waiting too long for the car to be stopped.session(car_live_data, 1m, [speed>50,speed<50)) partition by cid
create session windows to detect when the car is speeding. The first event with speed over 50 will be included, and the last event with speed lower than 50 will not be included in the session window.session(access_log, 5m, [action='login',action='logout']) partition by uid
create session windows when the user logins the system and logout. If there is no activity within 5 minutes, the window will be closed automatically.
✅ streaming query
🚫 historical query
dedup
dedup(stream, column1 [,otherColumns..] [liveInSecond,limit])
Apply the deduplication at the given data stream with the specified column(s). Rows with same column value will only show once (only the first row is selected and others are omitted.) liveInSecond
specifies how long the keys will be kept in the memory/state. By default forever. But if you only want to avoid duplicating within a certain time period, say 2 minutes, you can set 120s
, e.g. dedup(subquery,myId,120s)
The last parameter limit
is optional which is 100000
by default. It limits the max unique keys maintained in the query engine. If the limit reaches, the system will recycle the earliest keys to maintain this limit.
You can cascade this table function like tumble(dedup(table(....
and so far the wrapping order must in this sequence : tumble/hop/session -> dedup -> table.
✅ streaming query
✅ historical query
When you use dedup
function together with table()
function to get the latest status for events with same ID, you can consider ordering the data by _tp_time in the reverse way, so that the latest event for same ID is kept. e.g.
WITH latest_to_earliest AS (SELECT * FROM table(my_stream) ORDER by _tp_time DESC)
SELECT * FROM dedup(latest_to_earliest, id)
Otherwise, if you run queries with dedup(table(my_stream),id)
the earliest event with same ID will be processed first, ignoring the rest of the updated status. In many cases, this is not what you expect.
lag
lag(<column_name> [, <offset=1>][, <default_value>])
: Work for both streaming query and historical query. If you omit the offset
the last row will be compared. E.g.
lag(total)
to get the value of total
from the last row. lag(total, 12)
to get the value from 12 rows ago. lag(total, 12, 0)
to use 0 as the default value if the specified row is not available.
✅ streaming query
🚫 historical query
lags
lags(<column_name>, begin_offset, end_offset [, <default_value>])
similar to lag
function but can get a list of values. e.g. lags(total,1,3)
will return an array for the last 1, last 2 and last 3 values.
✅ streaming query
🚫 historical query
date_diff_within
date_diff_within(timegap,time1, time2)
returns true or false. This function only works in stream-to-stream join. Check whether the gap between time1
and time2
are within the specific range. For example date_diff_within(10s,payment.time,notification.time)
to check whether the payment time and notification time are within 10 seconds or less.
✅ streaming query
🚫 historical query
lag_behind
lag_behind(offset)
or lag_behind(offset,<column1_name>, <column2_name>)
It is designed for streaming JOIN. If you don't specify the column names, then it will use the processing time on the left stream and right stream to compare the timestamp difference.
Example:
SELECT * FROM stream1 ASOF JOIN stream2
ON stream1.id=stream2.id AND stream1.seq>=stream2.seq AND lag_behind(10ms, stream1.ts1, stream2.ts2)
✅ streaming query
🚫 historical query
latest
latest(<column_name>)
gets the latest value for a specific column, working with streaming aggregation with group by.
✅ streaming query
🚫 historical query
earliest
earliest(<column_name>)
gets the earliest value for a specific column, working with streaming aggregation with group by.
✅ streaming query
🚫 historical query
now
now()
Show the current date time, such as 2022-01-28 05:08:16
If the now() is used in a streaming query, no matter SELECT
or WHERE
or tumble/hop
window, it will reflect the current time when the row is projected.
✅ streaming query
✅ historical query
now64
Similar to now()
but with extra millisecond information, such as 2022-01-28 05:08:22.680
It can be also used in streaming queries to show the latest datetime with milliseconds.
✅ streaming query
✅ historical query
emit_version
emit_version()
to show an auto-increasing number for each emit of streaming query result. It only works with streaming aggregation, not tail or filter.
For example, if you run select emit_version(),count(*) from car_live_data
the query will emit results every 2 seconds and the first result will be with emit_version=0, the second result with emit_version=1. This function is particularly helpful when there are multiple rows in each emit result. For example, you can run a tumble window aggregation with a group by. All results from the same aggregation window will be in the same emit_version. You can then show a chart with all rows in the same aggregation window.
✅ streaming query
🚫 historical query
changelog
changelog(stream[, [key_col1[,key_col2,[..]],version_column], drop_late_rows])
to convert a stream (no matter append-only stream or versioned stream) to a changelog stream with given primary keys.
- If the source stream is a regular stream, i.e. append-only stream, you can choose one or more columns as the primary key columns.
changelog(append_stream, key_col1)
For example, the car_live_data stream containscid
as car id,speed_kmh
as the recently reported speed. Run the following SQL to create a changelog stream for each car to track the speed changeselect * from changelog(car_live_data,cid)
A new column_tp_delta
is included in the streaming query result.-1
indicates that the row is redacted(removed). _tp_delta = 1 with the new value. - If the source stream is a Versioned Stream, since the primary key(s) and version columns are specified in the versioned stream, the
changelog
function can be as simple aschangelog(versioned_kv)
- By default,
drop_late_rows
is false. But if you do want to drop late events for the same primary key, then you need to set drop_late_rows as true, and specify the version_column. The bigger the version_column value is, the more recent version it implies. In most case, you can set the event time(_tp_time) as the version_column. An example to drop the late event for car_live_data:
select _tp_time,cid,speed_kmh, _tp_delta
from changelog(car_live_data, cid, _tp_time, true)
✅ streaming query
🚫 historical query