跳转至主要内容

Timeplus Proton

Install

Proton 可以通过以下方式作为单个二进制文件安装在 Linux 或 Mac 上:

curl https://install.timeplus.com/oss | sh

对于 Mac 用户,你还可以使用 Homebrew 来管理安装/升级/卸载:

brew tap tap timeplus-io/timeplus
brew inst

你也可以在 Docker、Docker Compose 或 Kubernetes 中安装 Proton。

docker run -d --pull always -p 8123:8123 -p 8463:8463 --name proton d.timeplus.com/timeplus-io/proton:latest

Please check Server Ports to determine which ports to expose, so that other tools can connect to Timeplus, such as DBeaver.

Docker Compose 堆栈 演示了如何使用外部流在 Kafka/Redpanda 中读取/写入数据。

You can also try Proton in the fully-managed Timeplus Cloud.

如何读/写 Kafka 或 Redpanda

You use External Stream to read from Kafka topics or write data to the topics. We verified the integration with Apache Kafka, Confluent Cloud, Confluent Platform, Redpanda, WarpStream and many more.

CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name
(<col_name1> <col_type>)
SETTINGS type='kafka', brokers='ip:9092',topic='..',security_protocol='..',
username='..',password='..',sasl_mechanism='..'

如何从 PostgreSQL/MySQL/ClickHouse 加载数据

对于 PostgreSQL、MySQL 或其他 OLTP 数据库,您可以应用 CDC(变更数据捕获)技术,通过 Debezium 和 Kafka/Redpanda 将实时更改加载到 Proton 中。 Proton仓库的 cdc 文件夹 中的示例配置。 这篇博客 展示了 Timeplus Cloud 用户界面,但也可以应用于 Proton。

If you have data in local ClickHouse or ClickHouse Cloud, you can also use External Table to read data.

如何读/写 ClickHouse

You use External Table to read from ClickHouse tables or write data to the ClickHouse tables. 我们验证了与自托管的ClickHouse、ClickHouse Cloud、适用于ClickHouse的Aiven等的集成。

如何处理 UPSERT 或 DELETE

默认情况下,Timeplus 中的流仅限追加。 但是你可以使用 “versioned_kv” 或 “changelog_kv” 模式创建流来支持数据变更或删除。 The Versioned Stream supports UPSERT (Update or Insert) and Changelog Stream supports UPSERT and DELETE.

你可以使用像 Debezium 这样的工具向 Timeplus 发送 CDC 消息,也可以直接使用 INSERT SQL来添加数据。 具有相同主键的值将被覆盖。 欲了解更多详情,请观看此视频:

如何使用 JSON

Proton 支持强大且易于使用的 JSON 处理。 你可以将整个 JSON 文档保存为 “字符串” 类型的 “原始” 列。 然后使用 JSON 路径作为快捷方式以字符串形式访问这些值。 例如 “raw: a.b.c”。 如果你的数据是整数/浮点数/布尔值或其他类型,你也可以使用 :: 来转换它们。 例如 raw: a.b.c:: int。 如果你想读取 Kafka 主题中的 JSON 文档,你可以选择将每个 JSON 作为 “原始” 字符串读取,也可以将每个顶级键/值对作为列读取。 Please check the doc for details.

如何加载 CSV 文件

如果你只需要加载一个 CSV 文件,你可以创建一个流然后使用 INSERT INTO.. 选择... FROM 文件 (..) 语法。 例如,如果 CSV 文件中有 3 个字段:时间戳、价格、交易量,则可以通过以下方式创建流

CREATE STREAM stream
(
`timestamp` datetime64(3),
`price` float64,
`volume` float64
)
SETTINGS event_time_column = 'timestamp';

Please note there will be the 4th column in the stream, which is _tp_time as the Event Time.

要导入 CSV 内容,请使用 文件 表函数来设置文件路径、标题和数据类型。

INSERT INTO stream (timestamp,price,volume)
SELECT timestamp,price,volume
FROM file('data/my.csv', 'CSV', 'timestamp datetime64(3), price float64, volume float64')
SETTINGS max_insert_threads=8;
注意

请注意:

  1. 你需要指定列名。 否则,SELECT *将获得 3 列,而数据流中有 4 列。
  2. 出于安全原因,Proton 只读取 proton-data/user_files文件夹下的文件。 如果你在 Linux 服务器上通过 proton install命令安装Proton,则该文件夹将是 /var/lib/proton/user_files。 如果你不安装Proton并直接通过 “Proton服务器启动” 运行Proton二进制文件,则该文件夹将是 “proton-data/user_files”
  3. 我们建议使用 max_insert_threads=8 来使用多线程来最大限度地提高摄取性能。 如果你的文件系统的 IOPS 很高,你可以使用 SETTINGS shards=3 创建数据流,并在 INSERT语句中设置更高的 max_insert_threads 值。

如果您需要将多个 CSV 文件导入到单个数据流,则可以执行类似的操作。 你甚至可以再添加一列来跟踪文件路径。

CREATE STREAM kraken_all
(
`path` string,
`timestamp` datetime64(3),
`price` float64,
`volume` float64,
`_tp_time` datetime64(3, 'UTC') DEFAULT timestamp CODEC(DoubleDelta, LZ4),
INDEX _tp_time_index _tp_time TYPE minmax GRANULARITY 2
)
ENGINE = Stream(1, 1, rand())
PARTITION BY to_YYYYMM(_tp_time)
ORDER BY to_start_of_hour(_tp_time)
SETTINGS event_time_column = 'timestamp', index_granularity = 8192;

INSERT INTO kraken_all (path,timestamp,price,volume)
SELECT _path,timestamp,price,volume
FROM file('data/*.csv', 'CSV', 'timestamp datetime64(3), price float64, volume float64')
SETTINGS max_insert_threads=8;

How to visualize Timeplus query results with Grafana or Metabase

The official Grafana plugin for Timeplus is available here. 源代码位于 https://github.com/timeplus-io/proton-grafana-source。 你可以使用该插件运行 SQL 流,并在 Grafana 中构建实时图表,而无需刷新仪表板。 查看 此处 获取示例设置。

我们还为 Metabase 提供了一个插件:https://github.com/timeplus-io/metabase-proton-driver 它基于 Proton JDBC 驱动程序。

如何以编程方式访问 Timeplus Proton

SQL 是使用 Proton 的主要接口。 The Ingest REST API allows you to push realtime data to Proton with any language.

以下驱动程序可用: