可变流
This type of stream is only available in Timeplus Enterprise, with high performance query and UPSERT (UPDATE or INSERT).
As the name implies, the data in the stream is mutable. Value with the same primary key(s) will be overwritten.
The primary use case of mutable streams is serving as the lookup/dimensional data in Streaming JOIN, supporting millions or even billions of unique keys. You can also use mutable streams as the "fact table" to efficiently do range queries or filtering for denormalized data model, a.k.a. OBT (One Big Table).
Learn more about why we introduced Mutable Streams by checking this blog.
语法
CREATE MUTABLE STREAM [IF NOT EXISTS] stream_name (
<col1> <col_type>,
<col2> <col_type>,
<col3> <col_type>,
<col4> <col_type>
INDEX <index1> (col3)
FAMILY <family1> (col3,col4)
)
PRIMARY KEY (col1, col2)
SETTINGS
logstore_retention_bytes=..,
logstore_retention_ms=..,
shards=..
示例
Create a mutable stream
Create the stream with the following SQL:
CREATE MUTABLE STREAM device_metrics
(
device_id string,
timestamp datetime64(3),
batch_id uint32,
region string,
city string,
lat float32,
lon float32,
battery float32,
humidity uint16,
temperature float32
)
PRIMARY KEY (device_id, timestamp, batch_id)
备注:
- The compound primary key is a combination of device_id, timestamp and the batch_id. Data with exactly the same value for those 3 columns will be overridden.
- Searching data with any column in the primary key is very fast.
- By default there is only 1 shard and no extra index or optimization.
Load millions of rows
You can use CREATE RANDOM STREAM and a Materialized View to generate data and send to the mutable stream. But since we are testing massive historical data with duplicated keys, we can also use INSERT INTO .. SELECT
to load data.
INSERT INTO device_metrics
SELECT
'device_' || to_string(floor(rand_uniform(0, 2400))) AS device_id,
now64(9) AS timestamp,
floor(rand_uniform(0, 50)) AS batch_id,
'region_'||to_string(rand()%5) AS region,
'city_'||to_string(rand()%10) AS city,
rand()%1000/10 AS lat,
rand()%1000/10 AS lon,
rand_uniform(0,100) AS battery,
floor(rand_uniform(0,80)) AS humidity,
rand_uniform(0,100) AS temperature,
now64() AS _tp_time
FROM numbers(50_000_000)
Depending on your hardware and server configuration, it may take a few seconds to add all data.
0 rows in set. Elapsed: 11.532 sec. Processed 50.00 million rows, 400.00 MB (4.34 million rows/s., 34.69 MB/s.)
查询
When you query the mutable stream, Timeplus will read all historical data without any duplicated primary key.
SELECT count() FROM table(device_metrics)
Sample output:
┌─count()─┐
│ 120000 │
└─────────┘
1 row in set. Elapsed: 0.092 sec.
You can filter data efficiently with any part of the primary key:
SELECT count() FROM table(device_metrics) WHERE batch_id=5