system.stream_state_log
You can query the system.stream_state_log
stream to check the state changes of database resources in Timeplus. This stream gives you full visibility of the state of the streams, materialized views, and other resources in Timeplus.
Schema
This system stream is provisioned by Timeplus and cannot be modified. Here is the schema definition with comments:
CREATE STREAM system.stream_state_log
(
`node_id` uint64,
`database` string,
`name` string, -- name of the stream
`uuid` uuid, -- unique identifier of the stream
`state_name` string, -- name of the state metric
`state_value` uint64, -- numeric value of the state metric
`state_string_value` string, -- string value of the state metric
`_tp_time` datetime64(3), -- time of the state is collected
`_tp_sn` int64, -- sequence number of the data
INDEX _tp_time_index _tp_time TYPE minmax GRANULARITY 32,
INDEX _tp_sn_index _tp_sn TYPE minmax GRANULARITY 32
)
PARTITION BY to_YYYYMMDD(_tp_time)
ORDER BY (to_hour(_tp_time), database, name)
TTL to_datetime(_tp_time) + INTERVAL 1 YEAR -- keep the historical data for 1 year by default
SETTINGS logstore_retention_ms = 31536000000, index_granularity = 8192 -- keep the streaming data for 1 year by default
By default, every 5 seconds Timeplus collects the states and add the data points to this stream.
Categories of State
Different types of resources in Timeplus have different states. Here are the categories of the state:
Stream Storage
You can get the current size of the stream on disk for both streaming storage and historical storage. For example, the following query will get all stream storage states:
SELECT
database,
name AS stream_name,
node_id,
state_name,
format_readable_size(latest(state_value)) AS size,
latest(_tp_time) AS last_update
FROM table(system.stream_state_log)
WHERE state_name IN ('stream_logstore_disk_size', 'stream_historical_store_disk_size')
AND _tp_time > now() - 5m
GROUP BY database, name, node_id, state_name
ORDER BY database, name, node_id
SETTINGS
max_threads = 1, force_backfill_in_order = true;
stream_logstore_disk_size
The size of the logstore on disk.
stream_historical_store_disk_size
The size of the historical store on disk.
Stream Sequence Number
The sequence number of the stream. It's used to track the progress of the stream. For example, the following query will compare the sequence number of the streams:
SELECT
database,
name AS stream_name,
node_id,
state_name,
latest(state_value) AS sequence_number,
latest(_tp_time) AS last_update
FROM table(system.stream_state_log)
WHERE (state_name LIKE 'applied_sn_%' OR state_name LIKE 'committed_sn_%')
AND _tp_time > now() - 5m
GROUP BY database, name, node_id, state_name
ORDER BY database, name, node_id, state_name
SETTINGS
max_threads = 1, force_backfill_in_order = true;
applied_sn_
applied_sn_{shard}
. The shard number starts from 0. So for single node, it's applied_sn_0
.
The sequence number of the stream that has been applied.
committed_sn_
committed_sn_{shard}
. The shard number starts from 0. So for single node, it's committed_sn_0
.
The sequence number of the stream that has been committed.
start_sn_
start_sn_{source}[description]
. For example start_sn_StreamingStoreSource[stream=default.my_stream,shard=0]
The starting sequence number of the stream or materialized view source.
end_sn_
end_sn_{source}[description]
. For example end_sn_StreamingStoreSource[stream=default.my_stream,shard=0]
The ending sequence number of the stream or materialized view source.
processed_sn_
processed_sn_{source}[description]
. For example processed_sn_StreamingStoreSource[stream=default.my_stream,shard=0]
Last processed sequence number for the stream or materialized view source.
Materialized View
mv_checkpoint_storage_size
The size of the checkpoint storage on disk.
mv_status
The status of the materialized view. Read the state_string_value
column for the status. If the materialized view is running healthy, the status will be ExecutingPipeline
.
mv_last_error_message
The last error message of the materialized view. Read the state_string_value
column for the error message.
mv_recover_times
The number of times the materialized view has been recovered.
mv_memory_usage
The memory usage of the materialized view.
processed_record_ts_
processed_record_ts_{source}[description]
.
The timestamp of the last processed record for the materialized view source.
ckpt_sn_
ckpt_sn_{source}[description]
.
Last checkpoint sequence number for the materialized view source.
Ingestion Performance
ingest_dropped
The number of dropped messages during ingestion.
ingest_lt_500ms
The number of messages that are ingested within 500ms.
ingest_500ms_1s
The number of messages that are ingested between 500ms and 1s.
ingest_1_3s
The number of messages that are ingested between 1s and 3s.
ingest_3_6s
The number of messages that are ingested between 3s and 6s.
ingest_gt_6s
The number of messages that are ingested over 6s.
External Stream
external_stream_read_failed
The number of failed read operations from the external stream.
external_stream_written_failed
The number of failed write operations to the external stream.
Dictionary
bytes_allocated
The number of bytes allocated in the memory for the dictionary.
hierarchical_index_bytes_allocated
The number of bytes allocated in the memory for the hierarchical index.
query_count
The number of queries executed with the dictionary.
hit_rate_pct
The hit rate percentage of the dictionary.
found_rate_pct
The found rate percentage of the dictionary.
element_count
The number of elements in the dictionary.
load_factor_pct
The load factor percentage of the dictionary.
loading_start_time
The start time of the loading process.
last_successful_update_time
The last successful update time of the dictionary.
loading_duration_ms
The duration of the loading process.
last_exception
The last exception message of the dictionary. Read the state_string_value
column for the exception message.
Replication
quorum_replication_status
The status of the quorum replication. Read the state_string_value
column for the status.
For example, the following query will get the quorum replication status:
SELECT
database, name AS stream_name, node_id AS reporting_node, latest(state_string_value) AS quorum_status, latest(_tp_time) AS last_update
FROM
table(system.stream_state_log)
WHERE
(state_name = 'quorum_replication_status') AND (_tp_time > (now() - INTERVAL 5 MINUTE))
GROUP BY database, name, node_id
ORDER BY database, name, node_id
SETTINGS
max_threads = 1, force_backfill_in_order = true;
A sample message of the quorum replication status is:
{
"shard": 0,
"shard_replication_statuses": [
{
"append_message_flow_paused": false,
"inflight_messages": 0,
"is_learner": false,
"next_sn": 0,
"node": 1,
"pending_snapshot_sn": 0,
"recent_active": false,
"replicated_sn": 613219,
"state": "Replicate"
}
]
}
You can further extract the fields from the JSON message. For example:
WITH
latest_status AS (
SELECT
database,
name,
node_id,
arg_max(state_string_value, _tp_time) AS status_json,
max(_tp_time) AS last_update
FROM table(system.stream_state_log)
WHERE state_name = 'quorum_replication_status'
AND _tp_time > (now() - INTERVAL 5 MINUTE)
GROUP BY database, name, node_id
)
SELECT
database,
name AS stream_name,
node_id AS reporting_node,
status_json:shard AS shard,
json_extract_array_raw(status_json, 'shard_replication_statuses') as statuses,
array_map(x -> json_extract_int(x, 'node'), statuses) AS member_nodes,
array_map(x -> json_extract_string(x, 'state'), statuses) AS member_states,
array_map(x -> json_extract_int(x, 'replicated_sn'), statuses) AS replicated_sns,
array_map(x -> json_extract_int(x, 'next_sn'), statuses) AS next_sns,
last_update
FROM latest_status
ORDER BY database, name, node_id
SETTINGS
max_threads = 1, force_backfill_in_order = true;