system.stream_state_log
You can query the system.stream_state_log
stream to check the state changes of database resources in Timeplus. This stream gives you full visibility of the state of the streams, materialized views, and other resources in Timeplus.
Schema
This system stream is provisioned by Timeplus and cannot be modified.
We first introduced this stream in Timeplus Enterprise 2.6. Based on user feedback and performance optimization, we have updated the schema in Timeplus Enterprise 2.7. If you upgrade from 2.6 to 2.7, the system will automatically recreate the stream with the new schema. The previous state log data will be dropped.
Here is the schema definition with comments:
CREATE STREAM system.stream_state_log
(
`node_id` uint64,
`database` low_cardinality(string),
`name` string, -- name of the stream
`uuid` uuid, -- unique identifier of the stream
`state_name` low_cardinality(string), -- name of the state metric
`state_value` uint64, -- numeric value of the state metric
`state_string_value` string, -- string value of the state metric
`dimension` string, -- additional dimensions for the metric
`_tp_time` datetime64(3), -- time of the state is collected
`_tp_sn` int64, -- sequence number of the data
INDEX _tp_time_index _tp_time TYPE minmax GRANULARITY 32,
INDEX _tp_sn_index _tp_sn TYPE minmax GRANULARITY 32
)
PRIMARY KEY (database, name, state_name, dimension, node_id)
ORDER BY (database, name, state_name, dimension, node_id)
TTL to_datetime(_tp_time) + INTERVAL 2 MONTH
SETTINGS mode = 'versioned_kv', logstore_codec = 'zstd', index_granularity = 8192
By default, every 5 seconds Timeplus collects the states and add the data points to this stream.
Categories of State
Different types of resources in Timeplus have different states. Here are the categories of the state:
Stream Storage
You can get the current size of the stream on disk for both streaming storage and historical storage. For example, the following query will get all stream storage states:
SELECT
database,
name AS stream_name,
node_id,
state_name,
dimension,
format_readable_size(latest(state_value)) AS size,
latest(_tp_time) AS last_update
FROM table(system.stream_state_log)
WHERE state_name = 'disk_size'
AND _tp_time > now() - 5m
GROUP BY database, name, node_id, state_name, dimension
ORDER BY database, name, node_id, dimension
SETTINGS
max_threads = 1, force_backfill_in_order = true;
disk_size
The disk size metric with dimensions:
log_store
: The size of the logstore on disk.historical_store
: The size of the historical store on disk.
Stream Sequence Number
The sequence number of the stream. It's used to track the progress of the stream. For example, the following query will compare the sequence number of the streams:
SELECT
database,
name AS stream_name,
node_id,
state_name,
dimension AS shard,
latest(state_value) AS sequence_number,
latest(_tp_time) AS last_update
FROM table(system.stream_state_log)
WHERE state_name IN ('applied_sn', 'committed_sn')
AND _tp_time > now() - 5m
GROUP BY database, name, node_id, state_name, dimension
ORDER BY database, name, node_id, state_name, dimension
SETTINGS
max_threads = 1, force_backfill_in_order = true;
applied_sn
The sequence number of the stream that has been applied. The shard number is stored in the dimension
field.
committed_sn
The sequence number of the stream that has been committed. The shard number is stored in the dimension
field.
start_sn
The starting sequence number of the stream or materialized view source. The source information is stored in the dimension
field.
end_sn
The ending sequence number of the stream or materialized view source. The source information is stored in the dimension
field.
processed_sn
Last processed sequence number for the stream or materialized view source. The source information is stored in the dimension
field.
Materialized View
checkpoint_storage_size
The size of the checkpoint storage on disk. The dimension is set to materialized_view
.
status
The status of the materialized view. Read the state_string_value
column for the status. If the materialized view is running healthy, the status will be ExecutingPipeline
. The dimension is set to materialized_view
.
last_error_message
The last error message of the materialized view. Read the state_string_value
column for the error message. The timestamp of the error is stored in the state_value
field. The dimension is set to materialized_view
.
recover_times
The number of times the materialized view has been recovered. The dimension is set to materialized_view
.
memory_usage
The memory usage of the materialized view. The dimension is set to materialized_view
.
processed_record_ts
The timestamp of the last processed record for the materialized view source. The source information is stored in the dimension
field.
ckpt_sn
Last checkpoint sequence number for the materialized view source. The source information is stored in the dimension
field.
Ingestion Performance
ingest
The number of dropped messages during ingestion. The dimension is set to dropped
.
ingest_latency
The number of messages that are ingested with different latency ranges, indicated by the following dimensions:
lt_500ms
: Within 500ms500ms_1s
: Between 500ms and 1s1_3s
: Between 1s and 3s3_6s
: Between 3s and 6sgt_6s
: Over 6s
Example query:
SELECT
database,
name AS stream_name,
node_id,
state_name,
dimension AS latency_range,
latest(state_value) AS message_count,
latest(_tp_time) AS last_update
FROM table(system.stream_state_log)
WHERE state_name = 'ingest_latency'
AND _tp_time > now() - 5m
GROUP BY database, name, node_id, state_name, dimension
ORDER BY database, name, node_id, dimension
SETTINGS
max_threads = 1, force_backfill_in_order = true;
External Stream
read_failed
The number of failed read operations from the external stream. The dimension is set to external_stream
.
written_failed
The number of failed write operations to the external stream. The dimension is set to external_stream
.
Dictionary
Dictionary metrics all use the dimension value dict
:
bytes_allocated
The number of bytes allocated in the memory for the dictionary.
hierarchical_index_bytes_allocated
The number of bytes allocated in the memory for the hierarchical index.
query_count
The number of queries executed with the dictionary.
hit_rate_pct
The hit rate percentage of the dictionary.
found_rate_pct
The found rate percentage of the dictionary.
element_count
The number of elements in the dictionary.
load_factor_pct
The load factor percentage of the dictionary.
loading_start_time
The start time of the loading process as milliseconds since epoch.
last_successful_update_time
The last successful update time of the dictionary as milliseconds since epoch.
loading_duration_ms
The duration of the loading process in milliseconds.
last_exception
The last exception message of the dictionary. Read the state_string_value
column for the exception message.
Example query for dictionary stats:
SELECT
database,
name AS dict_name,
node_id,
state_name,
latest(state_value) AS value,
latest(state_string_value) AS string_value,
latest(_tp_time) AS last_update
FROM table(system.stream_state_log)
WHERE dimension = 'dict'
AND _tp_time > now() - 5m
GROUP BY database, name, node_id, state_name
ORDER BY database, name, state_name
SETTINGS
max_threads = 1, force_backfill_in_order = true;
Replication
quorum_replication_status
The status of the quorum replication. Read the state_string_value
column for the status. The shard ID is now stored in the dimension
field.
For example, the following query will get the quorum replication status:
SELECT
database,
name AS stream_name,
node_id AS reporting_node,
dimension AS shard,
latest(state_value) AS quorum_commit_sn,
latest(state_string_value) AS quorum_status,
latest(_tp_time) AS last_update
FROM
table(system.stream_state_log)
WHERE
state_name = 'quorum_replication_status'
AND _tp_time > (now() - INTERVAL 5 MINUTE)
GROUP BY database, name, node_id, dimension
ORDER BY database, name, node_id, dimension
SETTINGS
max_threads = 1, force_backfill_in_order = true;
A sample message of the quorum replication status is:
{
"shard": 0,
"shard_replication_statuses": [
{
"append_message_flow_paused": false,
"inflight_messages": 0,
"is_learner": false,
"next_sn": 0,
"node": 1,
"pending_snapshot_sn": 0,
"recent_active": false,
"replicated_sn": 613219,
"state": "Replicate"
}
]
}
You can further extract the fields from the JSON message. For example:
WITH
latest_status AS (
SELECT
database,
name,
node_id,
dimension AS shard,
arg_max(state_string_value, _tp_time) AS status_json,
max(_tp_time) AS last_update
FROM table(system.stream_state_log)
WHERE state_name = 'quorum_replication_status'
AND _tp_time > (now() - INTERVAL 5 MINUTE)
GROUP BY database, name, node_id, dimension
)
SELECT
database,
name AS stream_name,
node_id AS reporting_node,
shard,
json_extract_array_raw(status_json, 'shard_replication_statuses') as statuses,
array_map(x -> json_extract_int(x, 'node'), statuses) AS member_nodes,
array_map(x -> json_extract_string(x, 'state'), statuses) AS member_states,
array_map(x -> json_extract_int(x, 'replicated_sn'), statuses) AS replicated_sns,
array_map(x -> json_extract_int(x, 'next_sn'), statuses) AS next_sns,
last_update
FROM latest_status
ORDER BY database, name, node_id, shard
SETTINGS
max_threads = 1, force_backfill_in_order = true;