diff --git a/ruoyi-modules/hwmom-tsdb/pom.xml b/ruoyi-modules/hwmom-tsdb/pom.xml index fd70697..0f71919 100644 --- a/ruoyi-modules/hwmom-tsdb/pom.xml +++ b/ruoyi-modules/hwmom-tsdb/pom.xml @@ -38,6 +38,11 @@ 2.23 + + com.influxdb + influxdb-client-java + 6.12.0 + diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/InfluxDbClient.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/InfluxDbClient.java new file mode 100644 index 0000000..1b8987e --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/InfluxDbClient.java @@ -0,0 +1,30 @@ +package org.dromara.tsdb.component; + +import java.util.List; +import java.util.Map; + +public interface InfluxDbClient { + /** + * 插入数据 + * @param measurement + * @param tags + * @param fields + */ + void writeData(String measurement, Map tags, Map fields); + + /** + * 插入数据 + * @param measurement + * @param tags + * @param fields + * @param timestamp + */ + void writeData(String measurement, Map tags, Map fields, long timestamp); + + /** + * 获取数据 + * @param query + * @return + */ + List> queryData(String query); +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb1xClientImpl.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb1xClientImpl.java new file mode 100644 index 0000000..97e08a9 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb1xClientImpl.java @@ -0,0 +1,86 @@ +package org.dromara.tsdb.component.impl; + +import com.influxdb.client.domain.WritePrecision; +import org.dromara.tsdb.component.InfluxDbClient; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Component +public class InfluxDb1xClientImpl implements InfluxDbClient { + + private final InfluxDB influxDB; + + public InfluxDb1xClientImpl( + @Value("${influxdb.url}") String url, + @Value("${influxdb.username}") String username, + @Value("${influxdb.password}") String password, + @Value("${influxdb.database}") String database) { + this.influxDB = InfluxDBFactory.connect(url, username, password); + this.influxDB.setDatabase(database); + } + + @Override + public void writeData(String measurement, Map tags, Map fields) { + Point.Builder pointBuilder = Point.measurement(measurement); + tags.forEach(pointBuilder::tag); + // 处理 fields,确保类型正确 + fields.forEach((key, value) -> { + if (value instanceof Number) { + pointBuilder.addField(key, (Number) value); + } else if (value instanceof String) { + pointBuilder.addField(key, (String) value); + } else if (value instanceof Boolean) { + pointBuilder.addField(key, (Boolean) value); + } else { + throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName()); + } + }); + influxDB.write(pointBuilder.build()); + } + + @Override + public void writeData(String measurement, Map tags, Map fields, long timestamp) { + Point.Builder pointBuilder = Point.measurement(measurement).time(timestamp, TimeUnit.MILLISECONDS); + tags.forEach(pointBuilder::tag); + // 处理 fields,确保类型正确 + fields.forEach((key, value) -> { + if (value instanceof Number) { + pointBuilder.addField(key, (Number) value); + } else if (value instanceof String) { + pointBuilder.addField(key, (String) value); + } else if (value instanceof Boolean) { + pointBuilder.addField(key, (Boolean) value); + } else { + throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName()); + } + }); + influxDB.write(pointBuilder.build()); + } + + + @Override + public List> queryData(String query) { + QueryResult result = influxDB.query(new Query(query)); + return result.getResults().stream() + .flatMap(r -> r.getSeries().stream()) + .flatMap(s -> s.getValues().stream()) + .map(values -> { + Map map = new HashMap<>(); + for (int i = 0; i < values.size(); i++) { + System.out.println("values: " + values.get(i)); + } + return map; + }) + .toList(); + } +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb2xClientImpl.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb2xClientImpl.java new file mode 100644 index 0000000..e445904 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb2xClientImpl.java @@ -0,0 +1,88 @@ +package org.dromara.tsdb.component.impl; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.WriteApi; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import org.dromara.tsdb.component.InfluxDbClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +public class InfluxDb2xClientImpl implements InfluxDbClient { + + private final InfluxDBClient influxDBClient; + private final String bucket; + + public InfluxDb2xClientImpl( + @Value("${influxdb.url}") String url, + @Value("${influxdb.token}") String token, + @Value("${influxdb.org}") String org, + @Value("${influxdb.bucket}") String bucket) { + this.influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket); + this.bucket = bucket; + } + + @Override + public void writeData(String measurement, Map tags, Map fields) { + try (WriteApi writeApi = influxDBClient.getWriteApi()) { + Point point = Point.measurement(measurement); + tags.forEach(point::addTag); + // 处理 fields,确保类型正确 + fields.forEach((key, value) -> { + if (value instanceof Number) { + point.addField(key, (Number) value); + } else if (value instanceof String) { + point.addField(key, (String) value); + } else if (value instanceof Boolean) { + point.addField(key, (Boolean) value); + } else { + throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName()); + } + }); + writeApi.writePoint(point); + } + } + + @Override + public void writeData(String measurement, Map tags, Map fields,long timestamp) { + try (WriteApi writeApi = influxDBClient.getWriteApi()) { + Point point = Point.measurement(measurement) + .time(timestamp,WritePrecision.MS);//设置时间戳 + tags.forEach(point::addTag); + // 处理 fields,确保类型正确 + fields.forEach((key, value) -> { + if (value instanceof Number) { + point.addField(key, (Number) value); + } else if (value instanceof String) { + point.addField(key, (String) value); + } else if (value instanceof Boolean) { + point.addField(key, (Boolean) value); + } else { + throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName()); + } + }); + writeApi.writePoint(point); + } + } + + @Override + public List> queryData(String query) { + List tables = influxDBClient.getQueryApi().query(query); + return tables.stream() + .flatMap(table -> table.getRecords().stream()) + .map(record -> { + Map map = new HashMap<>(); + record.getValues().forEach((key, value) -> map.put(key, value)); + return map; + }) + .toList(); + } +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java index 2e5e7f2..72dd811 100644 --- a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java @@ -1,7 +1,8 @@ package org.dromara.tsdb.config; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBFactory; +import org.dromara.tsdb.component.InfluxDbClient; +import org.dromara.tsdb.component.impl.InfluxDb1xClientImpl; +import org.dromara.tsdb.component.impl.InfluxDb2xClientImpl; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -9,22 +10,22 @@ import org.springframework.context.annotation.Configuration; @Configuration public class InfluxDbConfig { - @Value("${influxdb.url}") - private String influxDBUrl; + @Value("${influxdb.version}") + private String version; - @Value("${influxdb.username}") - private String username; - - @Value("${influxdb.password}") - private String password; - - @Value("${influxdb.database}") - private String database; + @Value("${influxdb.bucket}") + private String bucket; @Bean - public InfluxDB influxDB() { - InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, username, password); - influxDB.setDatabase(database); - return influxDB; + public InfluxDbClient influxDbClient( + InfluxDb1xClientImpl influxDb1xClient, + InfluxDb2xClientImpl influxDb2xClient) { + if ("1.x".equals(version)) { + return influxDb1xClient; + } else if ("2.x".equals(version)) { + return influxDb2xClient; + } else { + throw new IllegalArgumentException("Unsupported InfluxDB version: " + version); + } } } diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java index 7d5c7b9..95422ad 100644 --- a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java @@ -6,6 +6,9 @@ import org.influxdb.dto.QueryResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; +import java.util.List; +import java.util.Map; + @RestController @RequestMapping("/influx") public class InfluxDbController { @@ -18,7 +21,7 @@ public class InfluxDbController { */ @PostMapping("/write") public String writeData(@RequestBody InfluxMeasurementBo influxMeasurementBo) { - influxDbService.writeData(influxMeasurementBo); + influxDbService.writeData(); return "Data written to InfluxDB successfully!"; } @@ -26,7 +29,8 @@ public class InfluxDbController { * 查询数据 */ @GetMapping("/query") - public QueryResult queryData(@RequestParam String query) { - return influxDbService.queryData(query); + public List> queryData(@RequestParam String query) { + + return influxDbService.queryData(); } } diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java index ecd2b98..fc70b23 100644 --- a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java @@ -3,19 +3,12 @@ package org.dromara.tsdb.service; import org.dromara.tsdb.domain.bo.InfluxMeasurementBo; import org.influxdb.dto.QueryResult; -public interface IInfluxDbService { - /** - * 写入数据到InfluxDB - * @param influxMeasurementBo - */ - public void writeData(InfluxMeasurementBo influxMeasurementBo); +import java.util.List; +import java.util.Map; - /** - * 查询数据 - * - * @param query 查询语句 - * @return 查询结果 - */ - public QueryResult queryData(String query); +public interface IInfluxDbService { + public void writeData(); + + public List> queryData(); } diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java index de198af..f2d625a 100644 --- a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java @@ -1,5 +1,6 @@ package org.dromara.tsdb.service.impl; +import org.dromara.tsdb.component.InfluxDbClient; import org.dromara.tsdb.domain.bo.InfluxMeasurementBo; import org.dromara.tsdb.service.IInfluxDbService; import org.influxdb.InfluxDB; @@ -9,38 +10,52 @@ import org.influxdb.dto.QueryResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; @Service public class InfluxDbServiceImpl implements IInfluxDbService { @Autowired - private InfluxDB influxDB; + private InfluxDbClient influxDbClient; - /** - * 写入数据到InfluxDB - * @param influxMeasurementBo - */ @Override - public void writeData(InfluxMeasurementBo influxMeasurementBo) { - Point point = Point.measurement(influxMeasurementBo.getMeasurement()) - .tag(influxMeasurementBo.getTagKey(), influxMeasurementBo.getTagValue()) - .addField(influxMeasurementBo.getFieldKey(), influxMeasurementBo.getFieldValue()) - .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) - .build(); - influxDB.write(point); + public void writeData() { + Map tags = Map.of("device_id", "device331"); + Map fields = Map.of("value", 33.6); + influxDbClient.writeData("temperature", tags, fields); } - /** - * 查询数据 - * - * @param query 查询语句 - * @return 查询结果 - */ @Override - public QueryResult queryData(String query) { - QueryResult queryResult = influxDB.query(new Query(query)); - System.out.println(queryResult.getResults()); - return queryResult; + public List> queryData() { + String query = "from(bucket: \"hwmom\") |> range(start: -7d) |> filter(fn: (r) => r._measurement == \"temperature\")"; + List> queryData= influxDbClient.queryData(query); + StringBuilder sb = new StringBuilder(); + int index = 1; + + for(Map map : queryData) { + if(map == null) { + sb.append("发现空Map对象\n\n"); + continue; + } + + sb.append("第").append(index++).append("条记录:\n"); + + for(Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + + sb.append(" ").append(key) + .append(" = ") + .append(value != null ? value.toString() : "null") + .append("\n"); + } + sb.append("\n"); + } + + System.out.println(sb.toString()); + + return queryData; } }