Skip to main content

Sink

With Timeplus Console, you can easily explore and analyze streaming data, with intuitive UI, standard SQL and streaming charts. But you won't stop here. Timeplus enables you to send real-time insights to other systems, either to notify individuals or power up downstream applications.

Notify others via Email or Slack

After you start running a streaming query, you can click the icon to send real-time results to other systems.

Slack

You need to create a Slack incoming webhook so that Timeplus can send a slack message in the specific channel for each result. Please follow the Slack documentation for the instructions.

Once you've got the Slack webhook URL, you can specify it in the dialog and set a message body. You can refer to the column name via the {{.column}} expression. For instance, assume the output of the query is

timenumbernote
2022-01-23 10:00:00.12350foo
2022-01-23 10:05:00.12395Bar

You can set the message body to be The sensor data at {{.time}} is {{.number}}, with note: {{.note}}

Email

You can configure Timeplus to send an email for each result by specifying the email server, user name, password, etc. Similar to the Slack action, you can refer to the value for each column using the {{.column}} expression.

Send Data to Kafka

You can leverage Timeplus for various streaming analysis, such as

  • Downsample the data from iot devices and get min/max/avg values every 5 second
  • Identify any outlier based on the past pattern
  • transform the data by removing sensitive information, remove duplication, or apply lookup with dimension tables

The transformed data or outlier events can be sent to a Kafka topic for other systems to further process.

To send data to Kafka, submit a streaming query, then click the icon to send streaming results to Kafka. The following parameters are required:

  • Kafka broker(s) URL
  • Topic name: either an existing topic or specify the new topic name for Timeplus to create.
  • Authentication

Please refer to the Kafka source for details of the parameters. You can send data to Confluent Cloud, Confluent Platform, or custom managed Apache Kafka. The events will be encoded as JSON documents.

Send Data to Snowflake

You can apply streaming analysis in Timeplus, then send the results to Snowflake. There are a few different ways to make it happen:

  1. You can send the streaming results to Confluent Cloud or Kafka. Then leverage the Snowflake sink in Confluent Cloud to move the data into Snowflake. This approach will achieve lower latency. Please note the Confluent Cloud Kafka cluster needs to reside in the same cloud vendor and region, for example, both of them in us-west-1 of AWS. By default, the table in Snowflake will be created with the same name of the Kafka topic and the JSON document is saved in a TEXT column RECORD_CONTENT

For example, the query to downsample the data in Timeplus is

select window_end as time,cid,avg(speed_kmh) as speed_kmh,max(total_km) as total_km,
avg(gas_percent) as gas_percent,min(locked) as locked,min(in_use) as in_use
from tumble(car_live_data,2s) group by cid, window_end

Then create a Kafka sink to send such data to the topic: snowflake.

After setting up the sink connector in Confluent Cloud, a snowflake table will be created with the specified database and schema in your snowflake environment. Then you can create a view to flatten the JSON document, such as

create view downsampled as select RECORD_CONTENT:time::timestamp_tz as time,
RECORD_CONTENT:cid as cid, RECORD_CONTENT:gas_percent as gas_percent,
RECORD_CONTENT:in_use as in_use,RECORD_CONTENT:locked as locked,
RECORD_CONTENT:speed_kmh as speed_kmh,RECORD_CONTENT:total_km as total_km from snowflake
  1. You can also use other data integration tools to move data. For example, using AirByte to load the latest data from Timeplus table, then move them to Snowflake or other destinations.
info

The Timeplus source plugin for Airbyte is in the early stage. Please contact us to arrange the integration.

Trigger Actions via webhook

You can also add automations to trigger other systems to take actions when Timeplus finds any real-time insights. Simply choose the Webhook as the action type and optionally set a message body (by default, the entire row will be encoded as a JSON document and sent to the webhook). You can use this approach to perform rule-based automation without human interaction, such as swapping a overheated equipment, scaling up to scaling down the server farm, or reminder users on slack, etc. Please check this blog for real-world examples.

Sink API

If you need to call an API to create a sink, here are the references.

kafka

refer to https://kafka.apache.org/

PropertyRequiredDescriptionDefault
brokersyesSpecifies the list of broker addresses. This is a comma-separated string. such as kafka1:9092,kafka2:9092,kafka3:9092
topicyesSpecifies the Kafka topic to send data to
batch_countnoSpecifies the number of event in each batch1000
data_typenoSpecifies the data type to use for creating the stream. support json
saslnoSpecifies the Simple Authentication and Security Layer (SASL) mechanism for authentication. support none,plain,scram-sha-256,scram-sha-512none
usernamenoSpecifies the username for authentication
passwordnoSpecifies the password for authentication
tls.disablenoIf set to true, disables TLS encryptionfalse
tls.skip_verify_servernoIf set to true, skips server certificate verification when using TLSfalse

http

PropertyRequiredDescriptionDefault
urlyesSpecifies the URL of http
content_typenoSpecifies the content type
http_methodnoSpecifies the password for authenticationPOST
payload_fieldnoThe payload of the http request
http_headernohttp header object{}
oauth2noSpecifies oauth2 configuration. refer to oauth2
paralismnoSpecifies the paralism number schema1
retriesnoSpecifies the retries number0
retry_intervalnoSpecifies the interval between retries10s
timeoutnohttp timeput interval10s

oauth2

PropertyRequiredDescriptionDefault
enablednowether to enable oauth2false
client_keynoclient key
client_secretnoclient secret
token_urlnotoken URL
scopesnoscopes, list of strings

slack

refer to https://slack.com/

PropertyRequiredDescriptionDefault
urlyesthe webhook URL of the slack channel, which is considerred as a secret
templatenothe template used to send query result to slack, use {{ .field_name }} to replace the field of query result you want to reference. in case it is empty, will encode the event into JSON format
headernothe http header{}

timeplus

send query result to another timeplus stream

PropertyRequiredDescriptionDefault
stream_nameyesthe name of the target stream

clickhouse

refer to https://clickhouse.com/

PropertyRequiredDescriptionDefault
table_nameyesSpecifies the name of the target ClickHouse table
dsnyesSpecifies the ClickHouse Data Source Name (DSN). When specified, hosts, username, password, and database will be ignored
hostsyes*Specifies the list of ClickHouse server hosts
usernameyes*Specifies the username for authentication
passwordyes*Specifies the password for authentication
databaseyes*Specifies the ClickHouse database to use
engineyes*Specifies the ClickHouse table engine to use
suffixyes*Specifies a suffix to be added to the create table script
init_sqlyesSpecifies initial SQL to create the table. When specified, it ignores engine and suffix
batch_countnoSpecifies the batch count for data ingestion128
batch_durationnoSpecifies the batch duration for data ingestion100ms

pulsar

refer to https://pulsar.apache.org/

PropertyRequiredDescriptionDefault
topicyesSpecifies the topic of the pulsar to connect to
broker_urlyesSpecifies the URL of the broker to connect to
auth_typeyesSpecifies the authentication type to use. support ,oauth2,token
auth_paramsnoSpecifies authentication parameters as key-value pairs{}