1.10.1. 时序库统一操作 API

1.10.1.1. 概述

时序库统一操作 API 提供了一套 跨时序数据库的统一访问与写入抽象, 用于屏蔽不同后端在连接方式、写入模型与查询结果处理上的差异。

当前支持的后端包括:

  • InfluxDB 1.x(jdbc:influxdb:

  • InfluxDB 3.x(jdbc:influxdb3:

支持使用统一 SQL 转换查询语句,前缀需要使用 jdbc:unisql: + 特定数据库访问 URL 前缀。

该 API 的目标是提供一套 稳定、统一、面向时序语义的操作接口

1.10.1.2. 快速开始

本节通过一个完整示例,展示典型使用流程:

  • 创建数据源

  • 获取客户端(类似 JDBC Connection)

  • 构建 measurement 并写入数据

  • 查询并解析结果(以 InfluxDB 1.x 为例)

1.10.1.2.1. Maven 依赖

使用时序库统一操作 API 之前,需要在工程中引入以下依赖。

1.10.1.2.1.1. 基础依赖

统一操作 API 本身由 com.hundsun.lightdb:tsdb 提供,同时依赖统一 SQL:

<dependency>
    <groupId>com.hundsun.lightdb</groupId>
    <artifactId>tsdb</artifactId>
    <version>${unisql.version}</version>
</dependency>
<dependency>
    <groupId>com.hundsun.lightdb</groupId>
    <artifactId>sql-convert-runtime</artifactId>
    <version>${unisql.version}</version>
</dependency>

1.10.1.2.1.2. 原生时序库依赖(必需)

统一操作 API 不会内置或传递引入具体时序库的原生客户端, 使用方必须根据实际后端 显式引入对应的原生 API 依赖, 否则在运行期将无法正常使用。

1.10.1.2.1.2.1. InfluxDB 1.x

当使用 jdbc:influxdb: URL 时,必须引入 InfluxDB 1.x 原生客户端:

<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.25</version>
</dependency>
1.10.1.2.1.2.2. InfluxDB 3.x

当使用 jdbc:influxdb3: URL 时,必须引入 InfluxDB 3.x 原生客户端:

<dependency>
    <groupId>com.influxdb</groupId>
    <artifactId>influxdb3-java</artifactId>
    <version>1.4.0</version>
</dependency>

1.10.1.2.2. 创建数据源

TimeSeriesDataSource 采用 接口抽象 的方式定义, 具体实现可以支持:

  • 连接池化

  • 统一配置管理

  • 与 JDBC DataSource 类似的生命周期管理

在使用统一 SQL 时,通过 jdbc:unisql: 前缀启用, 并使用 targetDialect 指定目标时序库类型。

TimeSeriesDataSource dataSource = new TimeSeriesManagerDataSource();
dataSource.setUrl(
    "jdbc:unisql:influxdb:127.0.0.1:8086?targetDialect=influxdb1"
);
dataSource.setDatabase("example_db");
dataSource.setUsername("admin");
dataSource.setPassword("admin");

1.10.1.2.3. 获取客户端

获取 TimeSeriesClient 的方式与 JDBC 获取 Connection 类似, 使用完成后必须关闭,以释放底层资源。

try (TimeSeriesClient client = dataSource.getClient()) {
    // 与 JDBC Connection 类似,使用完成后需要关闭
}

1.10.1.2.4. 构建 measurement 并写入时序数据

measurement 不是简单的表名,而是 包含 tag / field 元信息的增强描述字符串, 用于统一不同后端的写入语义。

String measurement =
    "insert into /* tags: [region], fields: [temperature, humidity] */ " +
    "weather (region, temperature, humidity)";

通过 measurement 创建 PointBuilder,并写入一条时序数据:

try (TimeSeriesClient client = dataSource.getClient()) {

    PointBuilder point = client.measurement(measurement)
        .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
        .tag("region", "east")
        .addField("temperature", 23.5)
        .addField("humidity", 60);

    client.write(point);
}

说明:

  • measurement 描述字符串仅用于 写入阶段

  • 注释部分用于显式声明 tag / field

  • tag / field 名称必须与 PointBuilder 中使用的一致

  • 即使启用了统一 SQL,写入流程仍保持原生时序库语义

1.10.1.2.5. 查询与结果处理(InfluxDB 1.x 示例)

统一 SQL 主要作用于 查询阶段

调用 query(String) 时,框架会将统一 SQL 转换为目标时序库的原生查询语句。

查询接口统一返回 Object, 但不同后端返回的具体类型不同。

InfluxDB 1.x 返回 QueryResult,示例如下:

try (TimeSeriesClient client = dataSource.getClient()) {

    // 使用统一 SQL 风格的查询语句
    Object result = client.query(
        "SELECT * FROM weather WHERE region = 'east'"
    );

    if (!(result instanceof QueryResult)) {
        throw new IllegalStateException("Unexpected result type");
    }

    QueryResult queryResult = (QueryResult) result;

    QueryResult.Result firstResult = queryResult.getResults().get(0);
    QueryResult.Series series = firstResult.getSeries().get(0);

    List<String> columns = series.getColumns();
    List<List<Object>> values = series.getValues();

    for (List<Object> row : values) {
        int tempIndex = columns.indexOf("temperature");
        int humIndex = columns.indexOf("humidity");

        Double temperature = ((Number) row.get(tempIndex)).doubleValue();
        Integer humidity = ((Number) row.get(humIndex)).intValue();

        // 业务处理
    }
}

1.10.1.2.6. 完整案例

下面给出一个 完整、可运行的示例,演示:

  • 使用统一 SQL 创建 influxdb1 数据源

  • 插入时序数据

  • 使用统一 SQL 查询并解析结果(InfluxDB 1.x)

该示例适合作为使用方的最小参考实现。

TimeSeriesDataSource dataSource = new TimeSeriesManagerDataSource();
dataSource.setUrl("jdbc:unisql:influxdb:127.0.0.1:8086?targetDialect=influxdb1");
dataSource.setDatabase("example_db");
dataSource.setUsername("admin");
dataSource.setPassword("admin");

try (TimeSeriesClient client = dataSource.getClient()) {
    // =========================
    // 1. 插入时序数据
    // =========================

    String measurement =
        "insert into /* tags: [region], fields: [temperature, humidity] */ " +
        "weather (region, temperature, humidity)";

    PointBuilder point = client.measurement(measurement)
        .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
        .tag("region", "east")
        .addField("temperature", 25.3)
        .addField("humidity", 58);

    client.write(point);

    // =========================
    // 2. 使用统一 SQL 查询数据
    // =========================

    Object result = client.query("SELECT * FROM weather WHERE region = 'east'");

    // =========================
    // 3. 解析查询结果(InfluxDB 1.x)
    // =========================

    if (!(result instanceof QueryResult)) {
        throw new IllegalStateException("Unexpected query result type");
    }

    QueryResult queryResult = (QueryResult) result;

    if (queryResult.getResults().isEmpty()) {
        return;
    }

    QueryResult.Result firstResult = queryResult.getResults().get(0);
    if (firstResult.getSeries() == null || firstResult.getSeries().isEmpty()) {
        return;
    }

    QueryResult.Series series = firstResult.getSeries().get(0);

    List<String> columns = series.getColumns();
    List<List<Object>> values = series.getValues();

    int timeIndex = columns.indexOf("time");
    int tempIndex = columns.indexOf("temperature");
    int humIndex  = columns.indexOf("humidity");

    for (List<Object> row : values) {
        String time = (String) row.get(timeIndex);
        Double temperature = ((Number) row.get(tempIndex)).doubleValue();
        Integer humidity = ((Number) row.get(humIndex)).intValue();

        // 业务处理
    }
}

1.10.1.3. 统一 SQL(Unisql)使用说明

统一 SQL(Unisql)用于在 查询阶段 屏蔽不同后端时序库在 SQL 语法上的差异, 使应用可以使用一套统一的查询语句, 由框架在运行时自动转换为目标时序库可执行的原生语句。

统一 SQL 仅作用于查询(query)阶段, 写入流程与 measurement 机制不受影响。

1.10.1.3.1. 启用方式

统一 SQL 通过 特殊的 JDBC URL 前缀 启用。

使用 jdbc:unisql: 作为 URL 前缀, 并通过 targetDialect 参数指定目标时序库类型。

1.10.1.3.1.1. InfluxDB 1.x 示例

TimeSeriesDataSource dataSource = new TimeSeriesManagerDataSource();
dataSource.setUrl(
    "jdbc:unisql:influxdb:127.0.0.1:8086?targetDialect=influxdb1"
);
dataSource.setDatabase("example_db");

try (TimeSeriesClient client = dataSource.getClient()) {
    Object result = client.query(
        "SELECT * FROM weather WHERE region = 'east'"
    );
}

1.10.1.3.1.2. InfluxDB 3.x 示例

TimeSeriesDataSource dataSource = new TimeSeriesManagerDataSource();
dataSource.setUrl(
    "jdbc:unisql:influxdb3:127.0.0.1:8086?targetDialect=influxdb3"
);
dataSource.setDatabase("example_db");

try (TimeSeriesClient client = dataSource.getClient()) {
    Object result = client.query(
        "SELECT * FROM weather WHERE region = 'east'"
    );
}

1.10.1.3.2. URL 说明

  • jdbc:unisql: 启用统一 SQL 功能

  • targetDialect 指定目标时序库方言,用于 SQL 转换 当前常用取值包括:

    • influxdb1

    • influxdb3

  • targetDialect 外,其余 URL 参数与原生时序库保持一致

1.10.1.3.3. 使用限制与注意事项

  • 统一 SQL 仅对 ``query(String)`` 生效

  • 写入相关 API(measurement / write)不会进行 SQL 转换

  • 返回结果类型仍由底层时序库决定:

    • InfluxDB 1.x:QueryResult

    • InfluxDB 3.x:Stream<Object[]>

  • 并非所有原生特性都具备完全等价的统一 SQL 表达形式

  • 部分不常用或强依赖后端语义的 API 可能存在兼容性限制,例如:

    • 运行期动态切换 database

    • 运行期设置 retentionPolicy

建议在使用统一 SQL 时,将这类配置固定在数据源或初始化阶段完成。

1.10.1.4. 时序库差异说明

1.10.1.4.1. 连接 URL

不同后端通过 URL scheme 区分:

  • InfluxDB 1.x:jdbc:influxdb:host:port

  • InfluxDB 3.x:jdbc:influxdb3:host:port

调用方通过配置 URL 即可切换后端实现。

1.10.1.4.2. 查询结果类型

query() 方法统一返回 Object,但实际类型不同:

  • InfluxDB 1.x:QueryResult

  • InfluxDB 3.x:Stream<Object[]>

调用方应根据运行时后端进行类型判断与解析。

1.10.1.4.3. API 兼容性说明

由于后端能力差异,部分 API 在不同后端的支持程度不同:

  • InfluxDB 1.x - 支持在运行时设置 database - 支持 retentionPolicy

  • InfluxDB 3.x - database 通常在 URL 或配置阶段确定 - 不支持在运行时切换 retentionPolicy

建议:

  • 对于 InfluxDB 3.x,尽量在 DataSource 层完成上下文配置

  • 避免在运行时频繁切换 database / retentionPolicy

1.10.1.5. 接口说明

1.10.1.5.1. TimeSeriesDataSource

方法

说明

getClient()

获取时序客户端实例(需显式关闭)

setUrl(String)

设置连接 URL,用于区分不同后端

setDatabase(String)

设置默认数据库

setUsername(String)

设置用户名

setPassword(String)

设置密码

1.10.1.5.2. TimeSeriesClient

方法

说明

query(String)

执行查询,返回 Object

measurement(String)

创建 PointBuilder

write(PointBuilder)

写入单条时序数据

write(List<PointBuilder>)

批量写入时序数据

setDatabase(String)

设置当前数据库(部分后端不支持)

close()

关闭客户端并释放资源

1.10.1.5.3. PointBuilder

方法

说明

time(long, TimeUnit)

设置时间戳

tag(…)

添加 tag(按类型区分)

addField(…)

添加 field(按类型区分)

1.10.1.6. measurement 使用约束

  • measurement 必须包含 tag / field 声明

  • 声明内容必须与 PointBuilder 中实际使用一致

  • measurement 仅用于写入阶段,不参与查询

  • 查询语句始终使用原生时序数据库语法

measurement 描述字符串是统一写入模型的核心, 建议集中定义并复用。