通过采集 REST API 将数据推送到 Timeplus
作为通用解决方案,你可以使用任何首选语言调用摄取 REST API 将数据推送到 Timeplus。 借助Ingest API的最新增强,在许多情况下,您可以通过webhook配置其他系统来将数据直接推送到Timeplus,而无需编写代码。
有关详细的 API 文档,请查看 https://docs.timeplus.com/rest。
在 Timeplus 中创建流
首先,你需要使用网页界面或通过 REST API 在 Timeplus 中创建流。 应设置具有正确名称和类型的列。 在下一节中,我们假设流名称为 foo
。
在 HTTP 标头中设置 API 密钥
请为工作空间生成 API 密钥并在 HTTP 标头中设置 API 密钥,名称为:X-Api-Key
If you would like to leverage a 3rd party system/tool to push data to Timeplus but it doesn't allow custom HTTP Header, then you can use the standard Authorization
Header, with value ApiKey $KEY
.
For Timeplus Enterprise self-hosted deployments, you need to set HTTP Authorization header to be Basic [Base64 encoded user:password]
.
将数据推送到 Timeplus
端点
The endpoint for real-time data ingestion is https://us-west-2.timeplus.cloud/{workspace-id}/api/v1beta2/streams/{name}/ingest
确保你使用的是 “工作空间 ID”,而不是 “工作空间名称”。 Workspace-id 是一个包含8个字符的随机字符串。 You can get it from the browser address bar: https://us-west-2.timeplus.cloud/<workspace-id>/console
. Workspace-name 是您在创建工作区时设置的名称。 目前,此名称是只读的,但我们将在将来将其设为可编辑。
You need to send POST
request to this endpoint, e.g. https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest
选项
根据您的用例,有许多选项可以通过REST API将数据推送到Timeplus。 你可以在 HTTP 标头中设置不同的 “内容类型”,然后在 URL 中添加 “格式” 查询参数。
以下是向Timeplus推送数据的不同用例列表:
用例 | POST 正文示例 | 内容类型 | 网址 | 目标流中的列 |
---|---|---|---|---|
1. 推送 JSON 对象。 每个 JSON 都是一个事件。 | "key1": "value11", "key2": "value12", ... "key1": "value21", "key2": "value22", ... | 多列,例如 key1、key2 | ||
2. 推送单个 JSON 或长文本。 单项活动。 | "key1": "value11", "key2": "value12", ... | 单列,命名为 “raw” | ||
3. 推送一批事件。 每行都是一个事件。 | event1 event2 | 单列,命名为 “raw” | ||
4. 推送包含多个事件的特殊 JSON,无需重复列名 | "columns": ["key1","key2"], "data": [ ["value11","value12"], ["value21","value22"], ] | 多列,例如 key1、key2 |
1. 直接推送 JSON 对象
索取样品
- Node.js
- curl
- Python
- Java
const https = require("https");
const options = {
hostname: "us-west-2.timeplus.cloud",
path: "/ws123456/api/v1beta2/streams/foo/ingest?format=streaming",
method: "POST",
headers: {
"Content-Type": "application/x-ndjson",
"X-Api-Key": "<your_api_key>",
},
};
const data = `
{"key1": "value11", "key2": "value12"}
{"key1": "value21", "key2": "value22"}
`;
const request = https.request(options, (resp) => {});
request.on("error", (error) => {
console.error(error);
});
request.write(data);
request.end();
curl -s -X POST -H "X-Api-Key: your_api_key" \
-H "Content-Type: application/x-ndjson" \
https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest?format=streaming \
-d '
{"key1": "value11", "key2": "value12"}
{"key1": "value21", "key2": "value22"}
'
import requests
url = "https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest?format=streaming"
headers = {
"X-Api-Key": "your_api_key",
"Content-Type": "application/x-ndjson"
}
data = '''
{"key1": "value11", "key2": "value12"}
{"key1": "value21", "key2": "value22"}
'''
response = requests.post(url, headers=headers, data=data)
print(response.status_code)
print(response.text)
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import java.io.IOException;
public class Example {
public static void main(String[] args) throws IOException {
OkHttpClient client = new OkHttpClient();
String url = "https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest?format=streaming";
MediaType mediaType = MediaType.parse("application/x-ndjson");
String data = """
{"key1": "value11", "key2": "value12"}
{"key1": "value21", "key2": "value22"}
""";
RequestBody body = RequestBody.create(mediaType, data);
Request request = new Request.Builder()
.url(url)
.header("X-Api-Key", "your_api_key")
.header("Content-Type", "application/x-ndjson")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
System.out.println(response.code());
System.out.println(response.body().string());
}
}
}
您可以将换行符分隔的 JSON (http://ndjson.org/) 推送到终端节点。 确保将 HTTP 标头设置为以下内容之一:
application/vnd.timeplus+json; format=streaming
如果你想利用第三方系统/工具将数据推送到 Timeplus 但它不允许自定义内容类型,那么你可以使用标准的 “application/json”,然后向 `/api/v1beta2/streams/$STREAM_NAME/ingest?format=streaming
。 对于 API 身份验证,除了自定义 HTTP 标头 X-api-key: the_key
之外,我们现在还支持 Authorization:apiKey THE_KEY
了解更多 Ingest API
请求正文只是一个 JSON 对象流。 例如
{“key1":“value11",“key2":“value12",...}
{“key1":“value21",“key2":“value22",...}
...
每个对象不必在一行中。 例如:
{
“key1":“value11",
“key2":“value12",...
}
{
“key1":“value21",
“key2":“value22",...
}
...
它们也不必用换行符分隔:
{“key1":“valueA”,...}{“key1":“valueB”,...}{“key1":“valueC”,...,
}...
只需确保在请求正文中使用正确的值指定目标流中的所有列即可。
2. 将单个 JSON 或字符串推送到单列流
索取样品
- Node.js
- curl
- Python
- Java
const https = require("https");
const options = {
hostname: "us-west-2.timeplus.cloud",
path: "/ws123456/api/v1beta2/streams/foo/ingest?format=raw",
method: "POST",
headers: {
"Content-Type": "text/plain",
"X-Api-Key": "<your_api_key>",
},
};
const data = `{"key1": "value11", "key2": "value12"}`;
const request = https.request(options, (resp) => {});
request.on("error", (error) => {
console.error(error);
});
request.write(data);
request.end();
curl -s -X POST -H "X-Api-Key: your_api_key" \
-H "Content-Type: text/plain" \
https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest?format=raw \
-d '
{"key1": "value11", "key2": "value12"}
'
import requests
url = "https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest?format=raw"
headers = {
"X-Api-Key": "your_api_key",
"Content-Type": "text/plain"
}
data = '''
{"key1": "value11", "key2": "value12"}
'''
response = requests.post(url, headers=headers, data=data)
print(response.status_code)
print(response.text)
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import java.io.IOException;
public class Example {
public static void main(String[] args) throws IOException {
OkHttpClient client = new OkHttpClient();
String url = "https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest?format=raw";
MediaType mediaType = MediaType.parse("text/plain");
String data = """
{"key1": "value11", "key2": "value12"}
""";
RequestBody body = RequestBody.create(mediaType, data);
Request request = new Request.Builder()
.url(url)
.header("X-Api-Key", "your_api_key")
.header("Content-Type", "text/plain")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
System.out.println(response.code());
System.out.println(response.body().string());
}
}
}
在Timeplus中使用一个名为 “raw” 的 “字符串” 列创建流是一种常见的做法。 您可以将 JSON 对象放入此列,然后提取值(例如 “select raw: key1”),或者将原始日志消息放入此列。
当你将 Content-Type 标头设置为 “text/plain”,并将 format=raw 添加到摄取端点时,POST 请求中的整个正文将放在 “原始” 列中。
3. 将多个 JSON 或文本推送到单列流。 每行都是一个事件
索取样品
- Node.js
- curl
- Python
- Java
const https = require("https");
const options = {
hostname: "us-west-2.timeplus.cloud",
path: "/ws123456/api/v1beta2/streams/foo/ingest?format=lines",
method: "POST",
headers: {
"Content-Type": "text/plain",
"X-Api-Key": "<your_api_key>",
},
};
const data = `{"key1": "value11", "key2": "value12"}
{"key1": "value21", "key2": "value22"}
`;
const request = https.request(options, (resp) => {});
request.on("error", (error) => {
console.error(error);
});
request.write(data);
request.end();
curl -s -X POST -H "X-Api-Key: your_api_key" \
-H "Content-Type: text/plain" \
https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest?format=lines \
-d '{"key1": "value11", "key2": "value12"}
{"key1": "value21", "key2": "value22"}
'
import requests
url = "https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest?format=lines"
headers = {
"X-Api-Key": "your_api_key",
"Content-Type": "text/plain"
}
data = '''{"key1": "value11", "key2": "value12"}
{"key1": "value21", "key2": "value22"}
'''
response = requests.post(url, headers=headers, data=data)
print(response.status_code)
print(response.text)
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import java.io.IOException;
public class Example {
public static void main(String[] args) throws IOException {
OkHttpClient client = new OkHttpClient();
String url = "https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest?format=lines";
MediaType mediaType = MediaType.parse("text/plain");
String data = """
{"key1": "value11", "key2": "value12"}
{"key1": "value21", "key2": "value22"}
""";
RequestBody body = RequestBody.create(mediaType, data);
Request request = new Request.Builder()
.url(url)
.header("X-Api-Key", "your_api_key")
.header("Content-Type", "text/plain")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
System.out.println(response.code());
System.out.println(response.body().string());
}
}
}
当你将 Content-Type 标头设置为 “text/plain”,并将 “format=lines” 添加到摄取端点时,帖子正文中的每一行都将放在 “原始” 列中。
4. 在不重复列的情况下批量推送多个事件
索取样品
- Node.js
- curl
- Python
- Java
const https = require("https");
const options = {
hostname: "us-west-2.timeplus.cloud",
path: "/ws123456/api/v1beta2/streams/foo/ingest",
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Api-Key": "<your_api_key>",
},
};
const data = `
{
"columns": ["key1","key2"],
"data": [
["value11","value12"],
["value21","value22"],
]
}
`;
const request = https.request(options, (resp) => {});
request.on("error", (error) => {
console.error(error);
});
request.write(data);
request.end();
curl -s -X POST -H "X-Api-Key: your_api_key" \
-H "Content-Type: application/json" \
https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest \
-d '
{
"columns": ["key1","key2"],
"data": [
["value11","value12"],
["value21","value22"],
]
}
'
import requests
url = "https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest"
headers = {
"X-Api-Key": "your_api_key",
"Content-Type": "application/json"
}
data = '''
{
"columns": ["key1","key2"],
"data": [
["value11","value12"],
["value21","value22"],
]
}
'''
response = requests.post(url, headers=headers, data=data)
print(response.status_code)
print(response.text)
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import java.io.IOException;
public class Example {
public static void main(String[] args) throws IOException {
OkHttpClient client = new OkHttpClient();
String url = "https://us-west-2.timeplus.cloud/ws123456/api/v1beta2/streams/foo/ingest";
MediaType mediaType = MediaType.parse("text/plain");
String data = """
{
"columns": ["key1","key2"],
"data": [
["value11","value12"],
["value21","value22"],
]
}
""";
RequestBody body = RequestBody.create(mediaType, data);
Request request = new Request.Builder()
.url(url)
.header("X-Api-Key", "your_api_key")
.header("Content-Type", "application/json")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
System.out.println(response.code());
System.out.println(response.body().string());
}
}
}
The above method should work very well for most system integration. 但是,将在请求的正文中反复提及这些列的名称。
我们还提供了一种性能更高的解决方案,只列出一次列名。
Same endpoint URL: https://us-west-2.timeplus.cloud/{workspace-id}/api/v1beta2/streams/{name}/ingest
但是你需要将 HTTP 标头设置为以下其中之一:
application/vnd.timeplus+json
application/vnd.timeplus+json; format=compact
请求正文采用以下格式:
备注:
columns
是一个字符串数组 ,其中包含列名数据
是一个数组数组。 每个嵌套数组代表一行数据。 值顺序必须与 “列” 中完全相同的顺序匹配。
例如:
您也可以使用我们的软件开发工具包来提取数据,而无需处理 REST API 的低级细节。