Skip to main content

Streaming query

Query is unbounded by default

By default, Timeplus’s query behavior is different from traditional SQL which answers the question of what had happened. Instead, Timeplus’ query tries to answer the question of what is happening now in real-time and continuously updates the answer when new events enter the system.

Timeplus query is running on an unbounded stream. In most of the cases, the query won't stop unless the user cancels it. For example, the following query will return all the events in the stream that enter the Timeplus system in real-time after the query is executed. Each new event will trigger a new query result. And this query won't stop unless the user cancels the query.

select * from my_stream

The unbounded query can be converted to a bounded query by applying the function table(), when the user wants to ask the question about what has happened like the traditional SQL. The table() function could be used to decorate the stream. For example, the following query will return all the events that have already existed in the stream at the moment of the query execution. The query will terminate once all results have been returned to the user and it won't wait for a new event coming.

select * from table(my_stream)

How streaming queries are triggered

There are three categories of Timeplus streaming queries, based on how the data is aggregated.

CategoryDescriptionTrigger By
Non-aggregationper event processing, e.g. tail, filter, transformation/normalizationWhen events arrive
Window aggregationgroup events in the same windowWindow end and watermark
Global aggregationfrom now on till foreverA fixed interval, default every 2 second

If you see some new terms, no worries. Let's explore them more.

Non-aggregation

Aggregation is the process to combine data from across events into one or more new data. Some queries don't involve any aggregations, such as:

Tail

List all incoming data, such as

select * from my_stream

Filter

Only show certain columns or data matches certain pattern, such as

select c1,c2 from weblogs where http_code>=400

Transform

For each event, transform the data to remove sensitive information, or convert type, such as

select 
concat(first_name,' ', last_name) as full_name,
replace_regex(phone,'(\\d{3})-(\\d{3})-(\\d{4})','\\1-***-****') as phone
from user_activities

Non-aggregation is triggered per event arrival, which means every time there is a new event entering Timeplus, the query will use the new event to execute the related analysis and the analysis result will be then triggered and sent to the client.

Window aggregation

Window based aggregation is a typical analytic method in stream analysis. Each window has a fixed range with a specific start time and an end time. The window might move during analysis by a fixed step. The analysis result will be based on an aggregation function of all the events lived in that window range.

When using the window function for aggregation, the event time is used to decide whether the event is in that window. In the case the user does not specify the timestamp, the default one will be used. The user can also use any field in that event which is a datetime type as timestamp or dynamically generate a datetime field as timestamp.

Two typical window functions are tumble and hop.

For example:

select window_start, window_end, count(*) as count, max(c1) as max_c1
from tumble(my_stream,order_time, 5s) group by window_start, window_end

Window watermark

Window aggregation is triggered per-window. There is an internal watermark mechanism in Timeplus to check if all the events in the specific window have arrived or not. Once the watermark has shown that all events in that window are available, the aggregated analysis result will be triggered and sent to the client.

Watermark and delay

For more advanced scenarios, you can add delay to the trigger policy, such as adding 2 more seconds delay to allow more late events to be considered in each time window.

select window_start, window_end, count(*) as count, max(c1) as max_c1
from tumble(my_stream,order_time, 5s) group by window_start, window_end
emit after watermark with delay 2s

Global aggregation

Global aggregation will start the aggregation for all incoming events since the query is submitted, and never ends.

For example, if the user want to know what is the total number event in real time:

select count(*) from my_stream

Set trigger interval

Global aggregation is triggered periodically with an interval (by default, every 2 seconds). The user can specify the interval in the query statement.

A more complex example is:

select count(*) from my_stream where type='order'
emit periodic 5s