1.10.1. 时序库统一操作 API
1.10.1.1. 概述
时序库统一操作 API 提供了一套 跨时序数据库的统一访问与写入抽象, 用于屏蔽不同后端在连接方式、写入模型、SQL 使用方式以及查询结果处理上的差异。
当前支持的后端包括:
InfluxDB 1.x(
jdbc:influxdb:)ClickHouse(
jdbc:clickhouse:)
同时支持通过统一 SQL(Unisql)机制, 在查询与写入阶段使用一套统一的 SQL 语法, 由框架在运行时自动适配并下发至目标时序数据库。
该 API 的目标是提供一套 稳定、统一、接近 JDBC 使用体验的时序操作接口, 重点解决跨时序库使用成本高、SQL 与结果处理不一致的问题。
1.10.1.2. 快速开始
本节通过一个完整示例,展示典型使用流程:
创建数据源
获取客户端(类似 JDBC Connection)
使用预编译 SQL 写入数据(支持绑定变量)
查询并处理统一结果集
1.10.1.2.1. Maven 依赖
1.10.1.2.1.1. 基础依赖
统一操作 API 由 com.hundsun.lightdb:tsdb 提供,
并依赖统一 SQL 转换运行时组件:
<dependency>
<groupId>com.hundsun.lightdb</groupId>
<artifactId>tsdb</artifactId>
<version>${unisql.version}</version>
</dependency>
<dependency>
<groupId>com.hundsun.lightdb</groupId>
<artifactId>sql-convert-runtime</artifactId>
<version>${unisql.version}</version>
</dependency>
1.10.1.2.1.2. 原生时序库依赖(必需)
统一操作 API 不会内置具体时序库的原生客户端。 使用方必须根据实际后端 显式引入对应的原生 API 依赖, 否则在运行期将无法正常使用。
1.10.1.2.1.2.1. InfluxDB 1.x
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.25</version>
</dependency>
1.10.1.2.1.2.2. ClickHouse
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.9.6</version>
</dependency>
1.10.1.2.2. 创建数据源
TimeSeriesDataSource 采用 接口抽象 的方式定义, 具体实现可以支持:
连接池化
统一配置管理
与 JDBC DataSource 类似的生命周期管理
当需要启用统一 SQL 时,
通过 jdbc:unisql: 前缀并结合 targetDialect 参数指定目标时序库。
TimeSeriesDataSource dataSource = new TimeSeriesManagerDataSource();
dataSource.setUrl(
"jdbc:unisql:influxdb:127.0.0.1:8086?targetDialect=influxdb1"
);
dataSource.setDatabase("example_db");
dataSource.setUsername("admin");
dataSource.setPassword("admin");
1.10.1.2.3. 获取客户端
获取 TimeSeriesClient 的方式与 JDBC 获取 Connection 类似,
使用完成后必须关闭,以释放底层资源。
try (TimeSeriesClient client = dataSource.getClient()) {
// 与 JDBC Connection 类似,使用完成后需要关闭
}
1.10.1.2.4. 使用预编译 SQL 写入时序数据
时序数据的写入通过 标准 SQL + 预编译语句 完成。
SQL 中可以使用 ? 占位符,
通过 TimeSeriesPreparedStatement 进行参数绑定。
示例:写入一条时序数据。
String insertSql =
"INSERT /*+ TAGS(region) FIELDS(temperature, humidity) TIMESTAMP(time) */ INTO weather(time, region, temperature, humidity) " +
"VALUES (?, ?, ?, ?)";
try (TimeSeriesClient client = dataSource.getClient();
TimeSeriesPreparedStatement ps = client.prepareStatement(insertSql)) {
ps.setTimestamp(1, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
ps.setString(2, "east");
ps.setDouble(3, 23.5);
ps.setInt(4, 60);
ps.executeUpdate();
}
说明:
SQL 使用统一形式描述写入语义
时间戳通过
setTimestamp显式指定时间单位参数索引从
1开始,行为与 JDBC PreparedStatement 一致INSERT 语句必须有配套的 HINT ,用于指定列类型 INSERT SQL 规则
1.10.1.2.5. 时间戳处理方式
时序库统一操作 API 中的时间戳处理遵循以下规则:
统一 UTC 时间戳
所有时间戳参数(
setTimestamp)和时间戳返回值(getTimestampNanos)均使用 UTC 时区 的时间戳时间戳以纳秒(nanoseconds)为单位,从 1970-01-01T00:00:00Z 开始计算
不同后端的时区差异由框架内部自动转换,用户无需关心
写入时的时间戳
写入时使用 setTimestamp(int parameterIndex, long timestamp, TimeUnit unit) 方法:
// 使用毫秒时间戳写入
ps.setTimestamp(1, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
说明:
parameterIndex:参数索引,从 1 开始timestamp:时间戳值unit:时间单位(如 TimeUnit.MILLISECONDS、TimeUnit.SECONDS)框架会根据传入的 timeUnit 自动转换为纳秒存储
查询时的时间戳
查询时使用 getTimestampNanos(String columnName) 方法:
try (TimeSeriesResultSet rs = ps.executeQuery()) {
while (rs.next()) {
long timestampNanos = rs.getTimestampNanos("time");
// 根据需要转换为毫秒或其他单位
long timestampMillis = timestampNanos / 1_000_000;
// 转换为 LocalDateTime(UTC 时区)
Instant instant = Instant.ofEpochSecond(
timestampNanos / 1_000_000_000,
timestampNanos % 1_000_000_000
);
LocalDateTime utcDateTime = LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
}
}
说明:
返回值为纳秒级 UTC 时间戳
用户可根据需要进行单位转换或格式化
不同后端返回的时间戳格式由框架统一处理
时区处理建议
建议始终使用 UTC 时区进行时间戳的存储和查询
如需显示其他时区时间,可在应用层进行转换
避免在不同时区之间进行隐式转换,以免引起混淆
示例:完整的时区转换
// 写入:将本地时间转换为 UTC 时间戳后写入
LocalDateTime localTime = LocalDateTime.now();
Instant utcInstant = localTime.atZone(ZoneId.systemDefault()).toInstant();
long timestampNanos = utcInstant.getEpochSecond() * 1_000_000_000 + utcInstant.getNano();
ps.setTimestamp(1, timestampNanos, TimeUnit.NANOSECONDS);
// 查询:读取 UTC 时间戳后转换为本地时区显示
long readTimestampNanos = rs.getTimestampNanos("time");
Instant readInstant = Instant.ofEpochSecond(
readTimestampNanos / 1_000_000_000,
readTimestampNanos % 1_000_000_000
);
LocalDateTime localDisplayTime = LocalDateTime.ofInstant(readInstant, ZoneId.systemDefault());
1.10.1.2.6. 批量操作支持
时序数据库预编译语句支持批量插入操作,仅限 INSERT 语句使用。
通过 addBatch() executeBatch() 和 clearBatch() 方法实现批量操作。
示例:批量写入多条时序数据。
String insertSql =
"INSERT /*+ TAGS(region) FIELDS(temperature, humidity) TIMESTAMP(time) */ INTO weather(time, region, temperature, humidity) "
"VALUES (?, ?, ?, ?)";
try (TimeSeriesClient client = dataSource.getClient();
TimeSeriesPreparedStatement ps = client.prepareStatement(insertSql)) {
// 第一条记录
ps.setTimestamp(1, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
ps.setString(2, "east");
ps.setDouble(3, 23.5);
ps.setInt(4, 60);
ps.addBatch(); // 添加到批量缓存
// 第二条记录
ps.setTimestamp(1, System.currentTimeMillis() + 1000, TimeUnit.MILLISECONDS);
ps.setString(2, "west");
ps.setDouble(3, 24.1);
ps.setInt(4, 55);
ps.addBatch(); // 添加到批量缓存
// 执行批量操作
int[] results = ps.executeBatch();
}
说明:
仅 INSERT 语句支持批量操作
使用
addBatch()将当前参数加入批量缓存使用
executeBatch()执行所有批量语句使用
clearBatch()可清空批量缓存返回结果为每条语句的影响行数数组
1.10.1.2.7. 查询与结果处理
查询同样通过 prepareStatement 执行,
查询结果以 统一结果集 ResultSet 返回。
示例如下:
String querySql =
"SELECT time, temperature, humidity " +
"FROM weather WHERE region = ?";
try (TimeSeriesClient client = dataSource.getClient();
TimeSeriesPreparedStatement ps = client.prepareStatement(querySql)) {
ps.setString(1, "east");
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
Long time = rs.getTimestampNanos("time");
Double temperature = rs.getDouble("temperature");
Integer humidity = rs.getInt("humidity");
// 业务处理
}
}
}
结果说明:
返回标准的 ResultSet 接口,接近 JDBC 规范
使用
next()遍历结果集通过列名或列索引获取字段值
使用完成后必须关闭 ResultSet
1.10.1.2.8. 完整案例
下面给出一个 完整、可运行的示例,演示:
使用统一 SQL 创建 InfluxDB 1.x 数据源
写入一条时序数据
查询并解析统一结果集
TimeSeriesDataSource dataSource = new TimeSeriesManagerDataSource();
dataSource.setUrl("jdbc:unisql:influxdb:127.0.0.1:8086?targetDialect=influxdb1");
dataSource.setDatabase("example_db");
dataSource.setUsername("admin");
dataSource.setPassword("admin");
try (TimeSeriesClient client = dataSource.getClient()) {
// =========================
// 1. 写入时序数据
// =========================
String insertSql =
"INSERT /*+ TAGS(region) FIELDS(temperature, humidity) TIMESTAMP(time) */ INTO weather(time, region, temperature, humidity) " +
"VALUES (?, ?, ?, ?)";
try (TimeSeriesPreparedStatement ps =
client.prepareStatement(insertSql)) {
ps.setTimestamp(1, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
ps.setString(2, "east");
ps.setDouble(3, 25.3);
ps.setInt(4, 58);
ps.executeUpdate();
}
// =========================
// 2. 查询时序数据
// =========================
String querySql =
"SELECT time, temperature, humidity FROM weather WHERE region = ?";
try (TimeSeriesPreparedStatement ps =
client.prepareStatement(querySql)) {
ps.setString(1, "east");
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
Long time = rs.getTimestampNanos("time");
Double temperature = rs.getDouble("temperature");
Integer humidity = rs.getInt("humidity");
// 业务处理
}
}
}
}
1.10.1.3. 统一 SQL(Unisql)使用说明
统一 SQL(Unisql)用于在 SQL 层面 屏蔽不同后端时序库的语法差异, 使应用能够使用一套统一的 SQL 描述完成写入与查询操作。
1.10.1.3.1. 启用方式
统一 SQL 通过 JDBC URL 前缀 启用:
使用
jdbc:unisql:作为 URL 前缀通过
targetDialect参数指定目标时序库类型
示例:
InfluxDB 1.x
jdbc:unisql:influxdb:host:port?targetDialect=influxdb1ClickHouse
jdbc:unisql:clickhouse:host:port?targetDialect=clickhouse
1.10.1.3.2. 使用限制与注意事项
统一 SQL 对
prepareStatement的SELECT语句生效,其余语句不转换SQL 在执行前会根据
targetDialect自动转换返回结果统一为
TimeSeriesResultSet并非所有底层特性都具备完全等价的统一表达形式
运行期切换 database 或 retentionPolicy 的机制并不是所有后端都支持
建议将连接级别的上下文配置放置在 DataSource 初始化阶段完成。
1.10.1.4. 时序库差异说明
1.10.1.4.1. 连接 URL
InfluxDB 1.x:
jdbc:influxdb:host:portClickHouse:
jdbc:clickhouse:host:port
通过 URL scheme 区分后端实现。
1.10.1.4.2. API 兼容性说明
由于后端能力差异,部分 API 在不同后端的支持程度不同:
InfluxDB 1.x - 支持运行期切换 database - 支持 retentionPolicy
ClickHouse - 支持运行期切换 database - 不支持 retentionPolicy
建议根据目标后端选择合适的使用方式。
1.10.1.5. 接口说明
1.10.1.5.1. TimeSeriesDataSource
方法 |
说明 |
|---|---|
getClient() |
获取时序客户端实例(需显式关闭) |
setUrl(String) |
设置连接 URL |
setDatabase(String) |
设置默认数据库 |
setUsername(String) |
设置用户名 |
setPassword(String) |
设置密码 |
1.10.1.5.2. TimeSeriesClient
方法 |
说明 |
|---|---|
prepareStatement(String) |
创建预编译 SQL 语句 |
setConsistency(ConsistencyLevel) |
设置一致性级别(部分后端支持) |
setRetentionPolicy(String) |
设置保留策略(部分后端支持) |
setDatabase(String) |
设置当前数据库(部分后端不支持) |
close() |
关闭客户端并释放资源 |
1.10.1.5.3. TimeSeriesPreparedStatement
方法 |
说明 |
|---|---|
setXxx(int, value) |
绑定参数(索引从 1 开始) |
setTimestamp(int, long, TimeUnit) |
绑定时间戳参数 |
executeUpdate() |
执行写入类 SQL |
executeQuery() |
执行查询并返回统一结果集 |
addBatch() |
将当前参数加入批量缓存(仅 INSERT 语句支持) |
executeBatch() |
执行批量缓存中的所有语句,返回每条语句的影响行数数组(为1) |
clearBatch() |
清空批量缓存 |
close() |
关闭预编译语句并释放资源 |
1.10.1.5.4. TimeSeriesResultSet
方法 |
说明 |
|---|---|
next() |
将光标移动到下一行 |
getString(String columnName) |
以字符串形式获取指定列的值 |
getInt(String columnName) |
以整数形式获取指定列的值 |
getLong(String columnName) |
以长整数形式获取指定列的值 |
getShort(String columnName) |
以短整数形式获取指定列的值 |
getFloat(String columnName) |
以浮点数形式获取指定列的值 |
getDouble(String columnName) |
以双精度浮点数形式获取指定列的值 |
getBoolean(String columnName) |
以布尔值形式获取指定列的值 |
getTimestampNanos(String columnName) |
以纳秒时间戳形式获取指定列的值(UTC 时区) |
getMetaData() |
获取结果集的元数据信息 |
close() |
关闭结果集并释放相关资源 |
1.10.1.5.5. TimeSeriesResultSetMetaData
方法 |
说明 |
|---|---|
getColumnCount() |
获取结果集中的列数 |
getColumnType(int column) |
获取指定列的 SQL 类型(对应 java.sql.Types 中的常量) |
getColumnTypeName(int column) |
获取指定列的数据库特定类型名称 |
getColumnName(int column) |
获取指定列的名称 |
1.10.1.6. INSERT SQL 规则
所有写入操作必须使用 INSERT SQL 语句
SQL 中必须包含 hint 注释,用于声明 tag / field 元信息
hint 必须位于 INSERT 与 INTO 之间,采用 MySQL 类似的注释风格:
INSERT /*+ TAGS(region) FIELDS(temperature, humidity) TIMESTAMP(time) */ INTO weather (time, region, temperature, humidity) VALUES (?, ?, ?, ?);
hint 用于指导框架正确地写入时序数据库
tag / field 声明内容必须与
TimeSeriesPreparedStatement中绑定的列保持一致未包含正确 hint 的 INSERT 将被视为非法,框架会抛出异常