Enrichment Join
Enrichment Join enriches a live data stream with reference data from a static right-hand side dataset. It is designed for scenarios where the right-hand side data is unchanging or represents a point-in-time snapshot.
In this join type, events from the left-hand stream trigger the join. Each incoming event probes a static hash table that is pre-built from the right-hand side dataset — as illustrated in the diagram below.
Because the right-hand side data remains static, only LEFT and INNER joins are supported.
Syntax
SELECT
*
FROM
lhs_stream [LEFT | INNER] JOIN rhs_table
ON lhs_stream.col1 = rhs_table.col1 AND ...;
The right-hand side (rhs_table) can be one of the following:
-
A Timeplus stream wrapped with
table()function- The table() function snapshots the stream and builds a static hash table for joining.
-
An external table (e.g., PostgreSQL or MySQL)
- Timeplus loads all necessary data from the external source and builds a static hash table in memory.
Example
Assume you have two streams, orders and products. The products stream is a mutable stream keyed by a unique product ID. You like enrich every order with the corresponding product details.
CREATE STREAM orders
(
id string,
product_id string,
price float64,
quantity uint32,
timestamp datetime64(3)
);
CREATE MUTABLE STREAM products
(
id string,
name string,
country string
)
PRIMARY KEY id;
Enrichment Join Query:
SELECT
*
FROM orders INNER JOIN table(products) AS products
ON orders.product_id = products.id;
Explanation:
- When the query starts, Timeplus loads a snapshot of the
productsstream and builds a static hash table. - For every new order in the
ordersstream, Timeplus probes this hash table using theproduct_id. - If a match is found, the joined result is emitted downstream.
Concurrent Enrichment Join
Enrichment joins are generally very fast since the right-hand side hash table is static and Timeplus optimizes hash lookups heavily.
However, for high-throughput streams, you can improve performance further by enabling parallel (concurrent) hash joins.
Syntax:
SELECT
*
FROM
lhs_stream [LEFT | INNER] JOIN rhs_table
ON lhs_stream.col1 = rhs_table.col1 AND ...
SETTINGS
join_algorithm = 'parallel_hash', -- Enable concurrent hash join
max_threads = <threads>; -- Control concurrency
Example:
SELECT
*
FROM orders INNER JOIN table(products) AS products
ON orders.product_id = products.id
SETTINGS
join_algorithm = 'parallel_hash',
max_threads = 4;
Explanation:
join_algorithm='parallel_hash'enables parallel hash join.max_threads=4controls the number of concurrent threads.- Internally:
- Timeplus shuffles the left stream
ordersinto four non-overlapping substreams by join key. - The right table (
products) is partitioned into four subtables, each building a separate hash table. - Each substream joins its corresponding subtable in parallel, significantly improving join throughput.
- Timeplus shuffles the left stream
Optimize Memory Consumption
If the right-hand side dataset is very large (e.g., hundreds of millions of keys), building an in-memory hash table may consume significant memory.
To mitigate this, you can enable hybrid hash tables using the query setting default_hash_join='hybrid'.
Example:
SELECT
*
FROM orders INNER JOIN table(products) AS products
ON orders.product_id = products.id
SETTINGS
default_hash_join = 'hybrid', -- Enable hybrid hash table
max_hot_keys = 100000; -- Maximum number of hot keys in memory
Explanation:
default_hash_join='hybrid'enables a hybrid in-memory + disk-based hash table.max_hot_keys=100000limits the number of keys stored in memory. When this limit is reached, older keys are spilled to disk using an LRU (Least Recently Used) policy.- This approach balances memory efficiency and performance — if most lookups hit the in-memory hot keys, join performance remains excellent.