Skip to main content

Demo Scenario

This document demonstrates how to run streaming queries in Timeplus to solve various use cases.

Customer Scenario and Data Model

You are the lead business analyst in a carsharing company. Sensors are equipped in each car to report car locations. The customers use the mobile app to find available cars nearby, book them, unlock them and hit the road. At the end of the trip, the customer parks the car, locks it, ends the trip. The payment will be proceeded automatically with the registered credit card.

Some of the typical use cases for time-sensitive insights are:

  • how many cars are being driven by users in certain location? do we need to move some cars from less busy locations to those hot zones?
  • which cars are being driven too fast or running low fuel? The service team may need to take actions.
  • which users keep booking cars then cancelling them? Shall we send real-time notification to those users to avoid abuse.

There are multiple data streams in the systems:

dim_user_info

A relative static stream with all registered user informations.

ColumnTypeSample Value
uidstringu00001
first_namestringFoo
last_namestringBar
emailstringa@timeplus.io
credit_cardstring371712345678910
genderstringF
birthdaystring1990-01-15

dim_car_info

A relative static stream with all registered cars

ColumnCommentTypeSample Value
cidcar IDstringc00001
license_plate_nostringKM235L
in_serviceFalse if the car is suspended (retried or in maintenance)boolTrue

car_live_data

A data stream with latest data from car sensors. When the car engine is started, report data every second. Otherwise, report data every half an hour.

ColumnCommentTypeSample Value
timedatetime of the sensor datadatetime2022-01-12 23:00:58.476
cidCar IDstringc00001
longitudecurrent positionfloat40.75896
latitudecurrent positionfloat-73.985195
gas_percentpercentage of gas level, 100 means full tankdecimal86.12
speed_kmhcurrent driving speed in KM/hourint56
total_kmthis car's total distance in km. Keep increasing after tripsfloat3536
lockedwhether the car is lockedboolTrue
in_usewhether someone is using the carboolTrue

bookings

A data stream with trip details and payment info. Each row is generated during the booking lifecycle

  • when the user books the car, a new event with action=add, with the booking_time=now, expire=now+30m
  • when the user unlock the car, a new event with action=service
  • when the user completes the trip and lock the car, a new event with action=end
  • when the user cancels the booking, a new event with action=cancel
  • when the user extends the booking for another 30min, a new event with action=extend, and update the expire field
  • if the user doesn't unlock the car before the expire time, then new event is added with action=expire
ColumnCommentTypeSample Value
timeWhen the event happensdatetime2022-01-12 13:00:58.476
bidbooking IDstringb00001
booking_timeWhen the user books the car. Expire in 30mindatetime2022-01-12 13:30:58.476
uidUser IDstringu00001
cidCar IDstringc00001
actionOne of the values: add, cancel, extend, service, expire,endstringadd
expireWhen the booking will be expireddatetime2022-01-12 13:30:58.476

trips

A data stream with trip details and payment info. Each row is generated at the end of the trip

ColumnCommentTypeSample Value
tidTrip IDstringt00001
start_timeWhen the trip startsdatetime2022-01-12 13:00:58.476
end_timeWhen the trip endsdatetime2022-01-12 24:00:58.476
bidbooking IDstringb00001
start_lonStart locationfloat40.75896
start_latStart locationfloat-73.985195
end_lonEnd locationfloat42.75896
end_latEnd locationfloat-71.985195
distancedistance drove in kmfloat23.2
amounthow much the user should pay for the tripdecimal40.75

The following sections show how to query Timeplus to understand the business.

Streaming Analysis

S-TAIL: Showing raw data with or without filter conditions

Use Case: to start the data exploration, the analyst wants to show all recently reported car iot data

SELECT * FROM car_live_data

or focusing on which cars are almost running out of gas (so that they can send service team to fill gas or suspend the car)

SELECT time,cid,gas_percent FROM car_live_data WHERE gas_percent < 25 

Try in playground

Result:

timecidgas_percentin_use
2022-01-12 23:00:58.476c0000118false

S-DOWNSAMPLING: Converting detailed data points to high level data

Use Case: the sensors on each car may report data from half second to every 10 seconds. The analyst may reduce the granularity and only need to save per-mintute data to downstream

SELECT window_start,cid, avg(gas_percent) AS avg_gas_percent,avg(speed_kmh) AS avg_speed FROM
tumble(car_live_data,1m) GROUP BY window_start, cid

Try in playground

Result:

window_startcidavg_gas_percentavg_speed
2022-01-12 23:01:00.000c000013435

More practically, the user can create a materialized view to automatically put downsampled data into a new stream/view.

CREATE MATERIALIZED VIEW car_live_data_1min as
SELECT window_start AS time,cid, avg(gas_percent) AS avg_gas,avg(speed_kmh) AS avg_speed
FROM tumble(car_live_data,1m) GROUP BY window_start, cid

Then the user can search the data via

SELECT * FROM car_live_data_1min

Result:

timecidavg_gasavg_speed
2022-01-12 23:01:00.000c000013435

S-AGG-RECENT: Showing aggregation for the recent data

Use Case: the analyst wants to monitor the total revenue for the past 1 hour.

Timeplus provides a special syntax to get such result easily

SELECT sum(amount) FROM trips EMIT LAST 1h

Try in playground

Once the query is submitted, it will show quite a few rows based on the past day, then show new results in a streaming fashion.

Result:

sum(amount)
3500.42

There are other ways to get similar results, with more verbose queries

  1. We can apply a global aggregation for data in recent 1 hour window. select sum(amount) from trips where end_time > date_sub(now(), 1h)

  2. The other solution is to use hop window aggregation. Similar to the tumble window in S-DOWNSAMPLING ,the data are grouped per a fixed size time window, such a hour. Tumble windows are not overlapped to each other, so it's ideal for downsampling without data duplication (for example, for count aggregation, no data will be counted twice) For hop window, it will be shifted to the left or right(past or future in the timeline) with a sliding step. For example, the following query will use the hop window to get total revenue for the past 1 hour, the result will be sent out every second. select window_start,window_end, sum(amount) from hop(trips,end_time,1s,1h) group by window_start,window_end

S-SESSION: analyzing activities with active sessions

Use Case: the analyst wants to track the daily movement of the cars. The sensors on the cars report data every second while the engine is started, and report data every half an hour when the engine is off. If the server doesn't receive the data for a running car for 5 seconds, the car is considered as disconnected. We can run the following query to show the trip distances for each running cars

SELECT cid,window_start,window_end,max(total_km)-min(total_km) AS trip_km 
FROM session(car_live_data, time, 5s, cid)
GROUP BY __tp_session_id, cid, window_start, window_end
HAVING trip_km > 0

Result:

cidwindow_startwindow_endtrip_km
c000402022-03-23 21:42:08.0002022-03-23 21:42:12.0000.05395412226778262
c000782022-03-23 21:42:08.0002022-03-23 21:42:33.0000.4258001818272703

More complex query can be created to aggregate the data by car id and trip ending time.

with query_1 AS (
select cid,window_start AS w_start,window_end AS w_end,max(total_km)-min(total_km) AS trip_km
from session(car_live_data,time,20m, cid) group by __tp_session_id, cid, window_start, window_end
)
select cid,window_start,window_end,sum(trip_km)
from tumble(query_1,w_end,1h) group by cid,window_start,window_end

Result:

cidwindow_startwindow_endtrip_km
c000012022-01-12 00:00:00.0002022-01-12 23:59:59.99917.2
c000022022-01-12 00:00:00.0002022-01-12 23:59:59.9994.1

This query is a continuously streaming query. Every hour (or every day, depending on tumble window size), the analysis results can be sent to email/slack or a Kafka topic for further processing.

S-TIME-TRAVEL: Going back to a past time and run analysis since then

Use Case: the analysts don't need to keep watching the streaming charts or dashboards. They can rewind to a past time to run the streaming analysis since that moment. This could help them to better understand what happened a few hours ago (such as midnight).

For example, the analyst wants to understand how the users book the car 2 hours ago

SELECT window_start,count(*) FROM tumble(bookings,15m) 
WHERE action='add' GROUP BY window_start
EMIT LAST 2h

Try in playground

Or they can specify an exactly timestamp, e.g.

SELECT window_start,count(*) FROM tumble(bookings,15m) 
WHERE action='add' GROUP BY window_start
settings seek_to='2022-01-12 06:00:00.000'

Result:

window_startcount(*)
2022-01-12 06:00:00.00034
2022-01-12 06:15:00.00023

Not only the past data will be analyzed, but also the latest incoming data will be processed continuously.

S-MVIEW: Creating materialized view to keep latest analysis result and cache for other systems to query

Use Case: unlike the traditional SQL queries, streaming queries never end until the user cancels it. The analysis results are kept pushing to the web UI or slack/kafka destinations. The analysts want to run advanced streaming query in Timeplus and cache the results as a materialized view. So that they can use regular SQL tools/systems to get the streaming insights as regular tables. Materialized views are also useful to downsample the data to reduce the data volume for future analysis and storage

CREATE MATERIALIZED VIEW today_revenue as
SELECT sum(amount) FROM trips WHERE end_time > today();

-- in Timeplus or other connected SQL clients
SELECT * FROM today_revenue

Try in playground

S-DROP-LATE: Dropping late events to get real-time aggregation insights

Use Case: the streaming data may arrive late for many reasons, such as network latency, iot sensor malfunction, etc. When we run streaming analysis (such as payment per minute), we aggregate the data based on their event time (when the payment actually happened, instead of when Timeplus receives the data), and we don't want to wait for events which are significantly too late.

Watermark is a common mechanism in the streaming processing world to set the bar how late the events can be. Unlike other systems, Timeplus makes it very easy to identify late event without explicitly setting a watermark policy.

For a query like this

SELECT window_start,window_end,sum(amount),count(*)
FROM tumble(trips,end_time,1m) GROUP BY window_start,window_end

Try in playground

It will show the total payment every minute, for example

window_startwindow_endsum(amount)count(*)
2022-01-12 10:00:00.0002022-01-12 10:01:00.00020042
2022-01-12 10:01:00.0002022-01-12 10:02:00.00030067

Considering two cars are returned in the same time at 10:00:10. For tripA and trip, both of them are supposed to be calculated into the first time window. However for some reason, the data point tripA arrives in Timeplus on 10:01:15, and tripB data point arrives on 10:01:16. Timeplus will accept tripA data and add it into the 1st window aggregation, and also close the first window. The watermark will be sent to 10:01:00. So when tripB data point arrives, it's considered to be too late and won't be calculated in the streaming result. But it'll be still available when we run a historical query.

data pointevent timearrive timenote
tripA2022-01-12 10:00:10.0002022-01-12 10:01:15.000included in 1st window, trigger the watermark change
tripB2022-01-12 10:00:10.0002022-01-12 10:01:16.000its time is lower than watermark.
1st window has been closed(not accepting more data)
The data is dropped for streaming analysis.
Still can be analyzed with historical searches

S-WAIT-LATE: Waiting for extra time for late events

Use Case: for more advanced use for the above case, it might be desirable for the analysts to set an extra time to wait for late events. This will make the streaming analysis not so real-time, however can include more data points, if the network latency is unpredictable.

Given the similar scenario, the query with the advanced setting is

SELECT window_start,window_end,sum(amount),count(*)
FROM tumble(trips,end_time,1m) GROUP BY window_start,window_end
EMIT AFTER WATERMARK AND DELAY 30s

Try in playground

S-TOP-K: Getting the most common value for each streaming window

Use Case: the analysts want to understand which cars are booked most often every day or every hour

SELECT window_start,top_k(cid,3) AS popular_cars FROM tumble(bookings,1h) GROUP BY window_start

Try in playground

This will generate a daily report like this

window_startpopular_cars
2022-01-12 00:00:00.000['car1','car2','car3']
2022-01-13 00:00:00.000['car2','car3','car4']

S-MAX-K: Getting the maximum value for each streaming window

Use Case: the analysts want to understand which trips are longest every day

SELECT window_start,max_k(amount,3,bid,distance) AS longest_trips FROM tumble(trips,1d) GROUP BY window_start

Try in playground

This will generate a daily report like this

window_startlongest_trips
2022-01-12 00:00:00.000[(7.62,'b01',13.8),(2.45,'b02',2.37),(12.66,'b03',22.6)]

To get the booking id for the 2nd longest trip, you can select ..,longest_trips[2].2 AS bookingId

S-MIN-K: Getting the minimal value for each streaming window

Use Case: the analysts want to understand which trips are shortest every day

SELECT window_start,min_k(amount,3,bid,distance) AS shortest_trips FROM tumble(trips,1d) GROUP BY window_start

Try in playground

This will generate a daily report like this

window_startshortest_trips
2022-01-12 00:00:00.000[(2.56,'b06',3.10),(7.68,'b07',10.8),(10.24,'b08',15.36)]

S-OVER-TIME: Getting the difference/gaps for results in each time window

Use Case: with Timeplus, the analysts can easily compare the current minute data with last minute data.

For example, the user wants to understand how many cars are being used in each minute and how it is different than the last minute

SELECT window_start,count(*) AS num_of_trips,
lag(num_of_trips) AS last_min_trips,num_of_trips-last_min_trips AS gap
FROM tumble(trips,1m) GROUP BY window_start

Try in playground

Result

window_startnum_of_tripslast_min_tripsgap
2022-01-12 10:00:00.00088088
2022-01-12 10:01:00.0008088-8
2022-01-12 10:02:00.000908010

This is a very powerful and useful capability. Besides comparing the last aggregation result, the analysts can also compare the data for the past. For example this second with the same second in last minute or last hour.

The following query comparing the number of car sensor data by each second, comparing the number of events in last m

SELECT window_start,count(*) AS num_of_events,
lag(window_start,60) AS last_min,
lag(num_of_events,60) AS last_min_events,
num_of_events-last_min_events AS gap,
concat(to_string(to_decimal(gap*100/num_of_events,2)),'%') AS change
FROM tumble(car_live_data,1s) GROUP BY window_start

Once the query starts running, for the first 1 minute, only the newer result is available. Then we can get the result from 60 windows back, so that we can compare the difference.

Result

window_startnum_of_eventslast_minlast_min_eventsgapchange
2022-01-12 10:01:00.000882022-01-12 10:00:00.0008355
2022-01-12 10:01:01.000802022-01-12 10:00:01.00087-7-8.75%

S-UNION-STREAMS: Merging multiple streams in same schema to a single stream

Use Case: there can be some data streams in the same data schema but intentionally put into different streams, such as one stream for one city, or a country (for performance or regulation considerations, for example). We would like to merge the data to understand the big picture.

For example, the car sharing company starts their business in Vancouver BC first. Then expand it to Victoria, BC. Per local city government's regulation requirements, two systems are setup. The head quarter wants to show streaming analysis for both cities.

SELECT * FROM trips_vancouver
UNION
SELECT * FROM trips_victoria

Other streaming queries we can run for this demo set

Get number of in-use cars

Each car will report their status to the car_live_data stream, including the in_use bool flag. For the same car id, it may report in_use=false 2 minutes ago, then report in_use=true 1 minute ago. Then this car should be considered as in-use. We should not run global aggregation since we only care about the current status, not the accumulated data (each running car should report data twice a second). tumble window should be okay with 1 second as window size.

SELECT window_start, count(distinct cid) FROM tumble(car_live_data,1s) 
WHERE in_use GROUP BY window_start

Try in playground

Get the top 10 cars order by revenue

We probably want to understand which cars help the company earn most revenue or which cars are not gaining enough revenue. This can be done with the following query

SELECT cid,sum(amount) AS revenue from trips 
INNER JOIN bookings on trips.bid=bookings.bid
WHERE end_time > today() GROUP BY cid
ORDER BY revenue DESC LIMIT 10
settings query_mode='table'

Try in playground

The result is like this

cidrevenue
c87850675.9
c30765637.48
c72990487.36
c00956481.66
c21898479.64
c96280476.62
c59872461.2
c51800451.14
c20995445.48
c04604445.3

You can further enrich the data by looking up the car license plate from dim_car_info

WITH top10cars AS (
SELECT cid,sum(amount) AS revenue FROM trips
INNER JOIN bookings on trips.bid=bookings.bid
WHERE end_time > today() GROUP BY cid
ORDER BY revenue DESC LIMIT 10
)
SELECT cid,revenue,license_plate_no FROM top10cars
INNER JOIN dim_car_info on top10cars.cid=dim_car_info.cid
settings query_mode='table'

The result is

cidrevenuelicense_plate_no
c30765749.06OPIH21
c87850701.14384K9Z
c00956522.663JHAYX
c68812497.12LR3AF9
c51363495.98B8UVY6
c72990487.36W6IZOB
c91904486.14LIHUZW
c21898479.641KTBIJ
c20995477.386GN7SP
c96280476.62YSK84J

Data Transformation/filter/cleaning

T-MASK: Scrubbing sensitive fields

Use Case: Scrub sensitive fields of data(such AS PII or payment information) to hide from downstream consumers

In this example, only the first and last 4 digits of the credit card numbers are shown during user activity analysis.

SELECT uid,replace_regex(credit_card,'(\\d{4})(\\d*)(\\d{4})','\\1***\\3') AS card 
FROM user_info

Try in playground

Result:

uidcard
u000013717***8910

T-DERIVE: Computing derived columns from raw data

Use Case: Create new columns to combine informations from multiple columns in the raw data, or turn data in certain columns in other format to make them ready to be displayed.

SELECT uid, concat(first_name,' ',last_name) AS full_name,
year(today())-year(to_date(birthday)) AS age FROM user_info

Try in playground

Result:

uidfull_nameage
u00001Foo Bar32

T-LOOKUP: Converting identifiers to more readable information

Use Case: While checking the car IoT data, we want to convert the car ID to its license plate number.

SELECT time, cid, c.license_plate_no AS license,gas_percent,speed_kmh FROM car_live_data 
INNER JOIN car_info AS c
ON car_live_data.cid=c.cid

Try in playground

Result:

timecidlicensegas_percentspeed_kmh
2022-01-12 23:00:58.476c00001KM235L5550