1.10.1. 时序库统一操作 API

1.10.1.1. 概述

时序库统一操作 API 提供了一套 跨时序数据库的统一访问与写入抽象, 用于屏蔽不同后端在连接方式、写入模型、SQL 使用方式以及查询结果处理上的差异。

当前支持的后端包括:

  • InfluxDB 1.x(jdbc:influxdb:

  • ClickHouse(jdbc:clickhouse:

同时支持通过统一 SQL(Unisql)机制, 在查询与写入阶段使用一套统一的 SQL 语法, 由框架在运行时自动适配并下发至目标时序数据库。

该 API 的目标是提供一套 稳定、统一、接近 JDBC 使用体验的时序操作接口, 重点解决跨时序库使用成本高、SQL 与结果处理不一致的问题。

1.10.1.2. 快速开始

本节通过一个完整示例,展示典型使用流程:

  • 创建数据源

  • 获取客户端(类似 JDBC Connection)

  • 使用预编译 SQL 写入数据(支持绑定变量)

  • 查询并处理统一结果集

1.10.1.2.1. Maven 依赖

1.10.1.2.1.1. 基础依赖

统一操作 API 由 com.hundsun.lightdb:tsdb 提供, 并依赖统一 SQL 转换运行时组件:

<dependency>
    <groupId>com.hundsun.lightdb</groupId>
    <artifactId>tsdb</artifactId>
    <version>${unisql.version}</version>
</dependency>
<dependency>
    <groupId>com.hundsun.lightdb</groupId>
    <artifactId>sql-convert-runtime</artifactId>
    <version>${unisql.version}</version>
</dependency>

1.10.1.2.1.2. 原生时序库依赖(必需)

统一操作 API 不会内置具体时序库的原生客户端。 使用方必须根据实际后端 显式引入对应的原生 API 依赖, 否则在运行期将无法正常使用。

1.10.1.2.1.2.1. InfluxDB 1.x
<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.25</version>
</dependency>
1.10.1.2.1.2.2. ClickHouse
<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.9.6</version>
</dependency>

1.10.1.2.2. 创建数据源

TimeSeriesDataSource 采用 接口抽象 的方式定义, 具体实现可以支持:

  • 连接池化

  • 统一配置管理

  • 与 JDBC DataSource 类似的生命周期管理

当需要启用统一 SQL 时, 通过 jdbc:unisql: 前缀并结合 targetDialect 参数指定目标时序库。

TimeSeriesDataSource dataSource = new TimeSeriesManagerDataSource();
dataSource.setUrl(
    "jdbc:unisql:influxdb:127.0.0.1:8086?targetDialect=influxdb1"
);
dataSource.setDatabase("example_db");
dataSource.setUsername("admin");
dataSource.setPassword("admin");

1.10.1.2.3. 获取客户端

获取 TimeSeriesClient 的方式与 JDBC 获取 Connection 类似, 使用完成后必须关闭,以释放底层资源。

try (TimeSeriesClient client = dataSource.getClient()) {
    // 与 JDBC Connection 类似,使用完成后需要关闭
}

1.10.1.2.4. 使用预编译 SQL 写入时序数据

时序数据的写入通过 标准 SQL + 预编译语句 完成。

SQL 中可以使用 ? 占位符, 通过 TimeSeriesPreparedStatement 进行参数绑定。

示例:写入一条时序数据。

String insertSql =
    "INSERT /*+ TAGS(region) FIELDS(temperature, humidity) TIMESTAMP(time) */ INTO weather(time, region, temperature, humidity) " +
    "VALUES (?, ?, ?, ?)";

try (TimeSeriesClient client = dataSource.getClient();
     TimeSeriesPreparedStatement ps = client.prepareStatement(insertSql)) {

    ps.setTimestamp(1, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    ps.setString(2, "east");
    ps.setDouble(3, 23.5);
    ps.setInt(4, 60);

    ps.executeUpdate();
}

说明:

  • SQL 使用统一形式描述写入语义

  • 时间戳通过 setTimestamp 显式指定时间单位

  • 参数索引从 1 开始,行为与 JDBC PreparedStatement 一致

  • INSERT 语句必须有配套的 HINT ,用于指定列类型 INSERT SQL 规则

1.10.1.2.5. 时间戳处理方式

时序库统一操作 API 中的时间戳处理遵循以下规则:

统一 UTC 时间戳

  • 所有时间戳参数(setTimestamp)和时间戳返回值(getTimestampNanos)均使用 UTC 时区 的时间戳

  • 时间戳以纳秒(nanoseconds)为单位,从 1970-01-01T00:00:00Z 开始计算

  • 不同后端的时区差异由框架内部自动转换,用户无需关心

写入时的时间戳

写入时使用 setTimestamp(int parameterIndex, long timestamp, TimeUnit unit) 方法:

// 使用毫秒时间戳写入
ps.setTimestamp(1, System.currentTimeMillis(), TimeUnit.MILLISECONDS);

说明:

  • parameterIndex:参数索引,从 1 开始

  • timestamp:时间戳值

  • unit:时间单位(如 TimeUnit.MILLISECONDS、TimeUnit.SECONDS)

  • 框架会根据传入的 timeUnit 自动转换为纳秒存储

查询时的时间戳

查询时使用 getTimestampNanos(String columnName) 方法:

try (TimeSeriesResultSet rs = ps.executeQuery()) {
    while (rs.next()) {
        long timestampNanos = rs.getTimestampNanos("time");

        // 根据需要转换为毫秒或其他单位
        long timestampMillis = timestampNanos / 1_000_000;

        // 转换为 LocalDateTime(UTC 时区)
        Instant instant = Instant.ofEpochSecond(
            timestampNanos / 1_000_000_000,
            timestampNanos % 1_000_000_000
        );
        LocalDateTime utcDateTime = LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
    }
}

说明:

  • 返回值为纳秒级 UTC 时间戳

  • 用户可根据需要进行单位转换或格式化

  • 不同后端返回的时间戳格式由框架统一处理

时区处理建议

  • 建议始终使用 UTC 时区进行时间戳的存储和查询

  • 如需显示其他时区时间,可在应用层进行转换

  • 避免在不同时区之间进行隐式转换,以免引起混淆

示例:完整的时区转换

// 写入:将本地时间转换为 UTC 时间戳后写入
LocalDateTime localTime = LocalDateTime.now();
Instant utcInstant = localTime.atZone(ZoneId.systemDefault()).toInstant();
long timestampNanos = utcInstant.getEpochSecond() * 1_000_000_000 + utcInstant.getNano();

ps.setTimestamp(1, timestampNanos, TimeUnit.NANOSECONDS);

// 查询:读取 UTC 时间戳后转换为本地时区显示
long readTimestampNanos = rs.getTimestampNanos("time");
Instant readInstant = Instant.ofEpochSecond(
    readTimestampNanos / 1_000_000_000,
    readTimestampNanos % 1_000_000_000
);
LocalDateTime localDisplayTime = LocalDateTime.ofInstant(readInstant, ZoneId.systemDefault());

1.10.1.2.6. 批量操作支持

时序数据库预编译语句支持批量插入操作,仅限 INSERT 语句使用。 通过 addBatch() executeBatch()clearBatch() 方法实现批量操作。

示例:批量写入多条时序数据。

String insertSql =
    "INSERT /*+ TAGS(region) FIELDS(temperature, humidity) TIMESTAMP(time) */ INTO weather(time, region, temperature, humidity) "
    "VALUES (?, ?, ?, ?)";

try (TimeSeriesClient client = dataSource.getClient();
     TimeSeriesPreparedStatement ps = client.prepareStatement(insertSql)) {

    // 第一条记录
    ps.setTimestamp(1, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    ps.setString(2, "east");
    ps.setDouble(3, 23.5);
    ps.setInt(4, 60);
    ps.addBatch();  // 添加到批量缓存

    // 第二条记录
    ps.setTimestamp(1, System.currentTimeMillis() + 1000, TimeUnit.MILLISECONDS);
    ps.setString(2, "west");
    ps.setDouble(3, 24.1);
    ps.setInt(4, 55);
    ps.addBatch();  // 添加到批量缓存

    // 执行批量操作
    int[] results = ps.executeBatch();
}

说明:

  • 仅 INSERT 语句支持批量操作

  • 使用 addBatch() 将当前参数加入批量缓存

  • 使用 executeBatch() 执行所有批量语句

  • 使用 clearBatch() 可清空批量缓存

  • 返回结果为每条语句的影响行数数组

1.10.1.2.7. 查询与结果处理

查询同样通过 prepareStatement 执行, 查询结果以 统一结果集 ResultSet 返回。

示例如下:

String querySql =
    "SELECT time, temperature, humidity " +
    "FROM weather WHERE region = ?";

try (TimeSeriesClient client = dataSource.getClient();
     TimeSeriesPreparedStatement ps = client.prepareStatement(querySql)) {

    ps.setString(1, "east");

    try (ResultSet rs = ps.executeQuery()) {
        while (rs.next()) {
            Long time = rs.getTimestampNanos("time");
            Double temperature = rs.getDouble("temperature");
            Integer humidity = rs.getInt("humidity");

            // 业务处理
        }
    }
}

结果说明:

  • 返回标准的 ResultSet 接口,接近 JDBC 规范

  • 使用 next() 遍历结果集

  • 通过列名或列索引获取字段值

  • 使用完成后必须关闭 ResultSet

1.10.1.2.8. 完整案例

下面给出一个 完整、可运行的示例,演示:

  • 使用统一 SQL 创建 InfluxDB 1.x 数据源

  • 写入一条时序数据

  • 查询并解析统一结果集

TimeSeriesDataSource dataSource = new TimeSeriesManagerDataSource();
dataSource.setUrl("jdbc:unisql:influxdb:127.0.0.1:8086?targetDialect=influxdb1");
dataSource.setDatabase("example_db");
dataSource.setUsername("admin");
dataSource.setPassword("admin");

try (TimeSeriesClient client = dataSource.getClient()) {

    // =========================
    // 1. 写入时序数据
    // =========================

    String insertSql =
        "INSERT /*+ TAGS(region) FIELDS(temperature, humidity) TIMESTAMP(time) */ INTO weather(time, region, temperature, humidity) " +
        "VALUES (?, ?, ?, ?)";

    try (TimeSeriesPreparedStatement ps =
             client.prepareStatement(insertSql)) {

        ps.setTimestamp(1, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        ps.setString(2, "east");
        ps.setDouble(3, 25.3);
        ps.setInt(4, 58);

        ps.executeUpdate();
    }

    // =========================
    // 2. 查询时序数据
    // =========================

    String querySql =
        "SELECT time, temperature, humidity FROM weather WHERE region = ?";

    try (TimeSeriesPreparedStatement ps =
             client.prepareStatement(querySql)) {

        ps.setString(1, "east");

        try (ResultSet rs = ps.executeQuery()) {
            while (rs.next()) {
                Long time = rs.getTimestampNanos("time");
                Double temperature = rs.getDouble("temperature");
                Integer humidity = rs.getInt("humidity");

                // 业务处理
            }
        }
    }
}

1.10.1.3. 统一 SQL(Unisql)使用说明

统一 SQL(Unisql)用于在 SQL 层面 屏蔽不同后端时序库的语法差异, 使应用能够使用一套统一的 SQL 描述完成写入与查询操作。

1.10.1.3.1. 启用方式

统一 SQL 通过 JDBC URL 前缀 启用:

  • 使用 jdbc:unisql: 作为 URL 前缀

  • 通过 targetDialect 参数指定目标时序库类型

示例:

  • InfluxDB 1.x

    jdbc:unisql:influxdb:host:port?targetDialect=influxdb1

  • ClickHouse

    jdbc:unisql:clickhouse:host:port?targetDialect=clickhouse

1.10.1.3.2. 使用限制与注意事项

  • 统一 SQL 对 prepareStatementSELECT 语句生效,其余语句不转换

  • SQL 在执行前会根据 targetDialect 自动转换

  • 返回结果统一为 TimeSeriesResultSet

  • 并非所有底层特性都具备完全等价的统一表达形式

  • 运行期切换 database 或 retentionPolicy 的机制并不是所有后端都支持

建议将连接级别的上下文配置放置在 DataSource 初始化阶段完成。

1.10.1.4. 时序库差异说明

1.10.1.4.1. 连接 URL

  • InfluxDB 1.x:jdbc:influxdb:host:port

  • ClickHouse:jdbc:clickhouse:host:port

通过 URL scheme 区分后端实现。

1.10.1.4.2. API 兼容性说明

由于后端能力差异,部分 API 在不同后端的支持程度不同:

  • InfluxDB 1.x - 支持运行期切换 database - 支持 retentionPolicy

  • ClickHouse - 支持运行期切换 database - 不支持 retentionPolicy

建议根据目标后端选择合适的使用方式。

1.10.1.5. 接口说明

1.10.1.5.1. TimeSeriesDataSource

方法

说明

getClient()

获取时序客户端实例(需显式关闭)

setUrl(String)

设置连接 URL

setDatabase(String)

设置默认数据库

setUsername(String)

设置用户名

setPassword(String)

设置密码

1.10.1.5.2. TimeSeriesClient

方法

说明

prepareStatement(String)

创建预编译 SQL 语句

setConsistency(ConsistencyLevel)

设置一致性级别(部分后端支持)

setRetentionPolicy(String)

设置保留策略(部分后端支持)

setDatabase(String)

设置当前数据库(部分后端不支持)

close()

关闭客户端并释放资源

1.10.1.5.3. TimeSeriesPreparedStatement

方法

说明

setXxx(int, value)

绑定参数(索引从 1 开始)

setTimestamp(int, long, TimeUnit)

绑定时间戳参数

executeUpdate()

执行写入类 SQL

executeQuery()

执行查询并返回统一结果集

addBatch()

将当前参数加入批量缓存(仅 INSERT 语句支持)

executeBatch()

执行批量缓存中的所有语句,返回每条语句的影响行数数组(为1)

clearBatch()

清空批量缓存

close()

关闭预编译语句并释放资源

1.10.1.5.4. TimeSeriesResultSet

方法

说明

next()

将光标移动到下一行

getString(String columnName)

以字符串形式获取指定列的值

getInt(String columnName)

以整数形式获取指定列的值

getLong(String columnName)

以长整数形式获取指定列的值

getShort(String columnName)

以短整数形式获取指定列的值

getFloat(String columnName)

以浮点数形式获取指定列的值

getDouble(String columnName)

以双精度浮点数形式获取指定列的值

getBoolean(String columnName)

以布尔值形式获取指定列的值

getTimestampNanos(String columnName)

以纳秒时间戳形式获取指定列的值(UTC 时区)

getMetaData()

获取结果集的元数据信息

close()

关闭结果集并释放相关资源

1.10.1.5.5. TimeSeriesResultSetMetaData

方法

说明

getColumnCount()

获取结果集中的列数

getColumnType(int column)

获取指定列的 SQL 类型(对应 java.sql.Types 中的常量)

getColumnTypeName(int column)

获取指定列的数据库特定类型名称

getColumnName(int column)

获取指定列的名称

1.10.1.6. INSERT SQL 规则

  • 所有写入操作必须使用 INSERT SQL 语句

  • SQL 中必须包含 hint 注释,用于声明 tag / field 元信息

  • hint 必须位于 INSERT 与 INTO 之间,采用 MySQL 类似的注释风格:

    INSERT /*+ TAGS(region) FIELDS(temperature, humidity) TIMESTAMP(time) */ INTO weather
    (time, region, temperature, humidity) VALUES (?, ?, ?, ?);
    
  • hint 用于指导框架正确地写入时序数据库

  • tag / field 声明内容必须与 TimeSeriesPreparedStatement 中绑定的列保持一致

  • 未包含正确 hint 的 INSERT 将被视为非法,框架会抛出异常