You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
177 lines
7.5 KiB
177 lines
7.5 KiB
|
3 months ago
|
package com.dashboard.aws.lambda.service;
|
||
|
|
|
||
|
|
import org.slf4j.Logger;
|
||
|
|
import org.slf4j.LoggerFactory;
|
||
|
|
|
||
|
|
import java.sql.*;
|
||
|
|
import java.time.*;
|
||
|
|
import java.util.*;
|
||
|
|
|
||
|
|
public class MeasureAverageService {
|
||
|
|
|
||
|
|
private static final Logger logger = LoggerFactory.getLogger(MeasureAverageService.class);
|
||
|
|
|
||
|
|
private static final String MYSQL_URL = System.getenv("DB_URL");
|
||
|
|
private static final String DB_USER = System.getenv("DB_USER");
|
||
|
|
private static final String DB_PASSWORD = System.getenv("DB_PASSWORD");
|
||
|
|
|
||
|
|
private Connection getConnection() throws SQLException {
|
||
|
|
return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
|
||
|
|
}
|
||
|
|
|
||
|
|
public void aggregateLastHour(Long companyId, ZoneId zone, LocalDateTime now, LocalDateTime start) {
|
||
|
|
long startTs = start.atZone(zone).toInstant().toEpochMilli();
|
||
|
|
long endTs = now.atZone(zone).toInstant().toEpochMilli();
|
||
|
|
|
||
|
|
logger.info("Aggregating data for company {} from {} to {}", companyId, start, now);
|
||
|
|
|
||
|
|
String schema = "data_center_dongjian_" + companyId;
|
||
|
|
String realtimeTable = "dashboard_realtime_measure";
|
||
|
|
String sourceTable = "dashboard_record_measure";
|
||
|
|
String targetTable = "dashboard_aggregate_measure_hour";
|
||
|
|
|
||
|
|
try (Connection conn = getConnection()) {
|
||
|
|
|
||
|
|
// 1: 从实时表获取所有上报过的设备ID集合
|
||
|
|
Set<String> deviceSet = new HashSet<>();
|
||
|
|
String realtimeSql = String.format("SELECT device_id FROM %s.%s", schema, realtimeTable);
|
||
|
|
try (Statement st = conn.createStatement();
|
||
|
|
ResultSet rs = st.executeQuery(realtimeSql)) {
|
||
|
|
while (rs.next()) {
|
||
|
|
deviceSet.add(rs.getString("device_id"));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
logger.info("Found {} devices from realtime table", deviceSet.size());
|
||
|
|
|
||
|
|
// 2: 查询过去一小时的数据
|
||
|
|
Map<String, List<Record>> currentHourData = queryRecordData(conn, schema, sourceTable, startTs, endTs);
|
||
|
|
|
||
|
|
// 3: 查询上一小时的聚合结果(用于回退)
|
||
|
|
LocalDateTime prevHour = start.minusHours(1);
|
||
|
|
Map<String, Double> prevHourData = queryPrevHourAggregates(conn, schema, targetTable, prevHour);
|
||
|
|
|
||
|
|
// 4: 计算当前小时平均值(如果有)
|
||
|
|
Map<String, Double> resultMap = new HashMap<>();
|
||
|
|
for (Map.Entry<String, List<Record>> entry : currentHourData.entrySet()) {
|
||
|
|
String deviceId = entry.getKey();
|
||
|
|
double avg = computeTimeWeightedAverage(entry.getValue(), startTs, endTs);
|
||
|
|
resultMap.put(deviceId, avg);
|
||
|
|
}
|
||
|
|
|
||
|
|
// 5: 对比集合 A,确保所有设备都有值(否则用上一小时或 0)
|
||
|
|
for (String deviceId : deviceSet) {
|
||
|
|
if (!resultMap.containsKey(deviceId)) {
|
||
|
|
Double prevVal = prevHourData.get(deviceId);
|
||
|
|
double value = (prevVal != null) ? prevVal : 0.0;
|
||
|
|
resultMap.put(deviceId, value);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 6: 写入本小时聚合表
|
||
|
|
String insertSql = String.format(
|
||
|
|
"INSERT INTO %s.%s (device_id, average_value, date_year, date_month, date_day, date_hour, " +
|
||
|
|
"time_start, time_end, aggregated_at) " +
|
||
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", schema, targetTable
|
||
|
|
);
|
||
|
|
|
||
|
|
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
|
||
|
|
for (Map.Entry<String, Double> entry : resultMap.entrySet()) {
|
||
|
|
String deviceId = entry.getKey();
|
||
|
|
double avg = entry.getValue();
|
||
|
|
LocalDateTime dt = start;
|
||
|
|
|
||
|
|
ps.setString(1, deviceId);
|
||
|
|
ps.setString(2, String.format("%.6f", avg));
|
||
|
|
ps.setInt(3, dt.getYear());
|
||
|
|
ps.setInt(4, dt.getMonthValue());
|
||
|
|
ps.setInt(5, dt.getDayOfMonth());
|
||
|
|
ps.setInt(6, dt.getHour());
|
||
|
|
ps.setLong(7, startTs);
|
||
|
|
ps.setLong(8, endTs);
|
||
|
|
ps.setTimestamp(9, Timestamp.valueOf(now));
|
||
|
|
|
||
|
|
ps.addBatch();
|
||
|
|
}
|
||
|
|
ps.executeBatch();
|
||
|
|
}
|
||
|
|
|
||
|
|
logger.info("Hourly aggregation finished for {} devices", resultMap.size());
|
||
|
|
|
||
|
|
} catch (SQLException e) {
|
||
|
|
logger.error("Hourly aggregation failed", e);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 查询当前小时数据
|
||
|
|
private Map<String, List<Record>> queryRecordData(Connection conn, String schema, String table, long startTs, long endTs) throws SQLException {
|
||
|
|
String sql = String.format(
|
||
|
|
"SELECT device_id, upload_value, upload_at FROM %s.%s " +
|
||
|
|
"WHERE upload_at >= ? AND upload_at < ? ORDER BY device_id, upload_at",
|
||
|
|
schema, table
|
||
|
|
);
|
||
|
|
Map<String, List<Record>> map = new HashMap<>();
|
||
|
|
try (PreparedStatement ps = conn.prepareStatement(sql)) {
|
||
|
|
ps.setLong(1, startTs);
|
||
|
|
ps.setLong(2, endTs);
|
||
|
|
ResultSet rs = ps.executeQuery();
|
||
|
|
while (rs.next()) {
|
||
|
|
String deviceId = rs.getString("device_id");
|
||
|
|
String valStr = rs.getString("upload_value");
|
||
|
|
long ts = rs.getLong("upload_at");
|
||
|
|
double val = (valStr == null || valStr.isEmpty()) ? 0.0 : Double.parseDouble(valStr);
|
||
|
|
map.computeIfAbsent(deviceId, k -> new ArrayList<>()).add(new Record(ts, val));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return map;
|
||
|
|
}
|
||
|
|
|
||
|
|
// 查询上一小时聚合结果
|
||
|
|
private Map<String, Double> queryPrevHourAggregates(Connection conn, String schema, String table, LocalDateTime prevHour) throws SQLException {
|
||
|
|
Map<String, Double> map = new HashMap<>();
|
||
|
|
String sql = String.format(
|
||
|
|
"SELECT device_id, average_value FROM %s.%s " +
|
||
|
|
"WHERE date_year = ? AND date_month = ? AND date_day = ? AND date_hour = ?",
|
||
|
|
schema, table
|
||
|
|
);
|
||
|
|
try (PreparedStatement ps = conn.prepareStatement(sql)) {
|
||
|
|
ps.setInt(1, prevHour.getYear());
|
||
|
|
ps.setInt(2, prevHour.getMonthValue());
|
||
|
|
ps.setInt(3, prevHour.getDayOfMonth());
|
||
|
|
ps.setInt(4, prevHour.getHour());
|
||
|
|
ResultSet rs = ps.executeQuery();
|
||
|
|
while (rs.next()) {
|
||
|
|
map.put(rs.getString("device_id"), rs.getDouble("average_value"));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return map;
|
||
|
|
}
|
||
|
|
|
||
|
|
private double computeTimeWeightedAverage(List<Record> records, long startTs, long endTs) {
|
||
|
|
if (records == null || records.isEmpty()) return 0.0;
|
||
|
|
|
||
|
|
double sum = 0.0;
|
||
|
|
double totalMinutes = (endTs - startTs) / 60000.0; // 毫秒 → 分钟
|
||
|
|
|
||
|
|
for (int i = 0; i < records.size(); i++) {
|
||
|
|
Record rec = records.get(i);
|
||
|
|
long nextTs = (i < records.size() - 1) ? records.get(i + 1).timestamp : endTs;
|
||
|
|
double intervalMinutes = (nextTs - rec.timestamp) / 60000.0; // 转分钟
|
||
|
|
logger.info("intervalMinutes: {}, rec.value: {}", intervalMinutes, rec.value);
|
||
|
|
sum += rec.value * intervalMinutes;
|
||
|
|
}
|
||
|
|
|
||
|
|
logger.info("totalMinutes: {}, sum: {}", totalMinutes, sum);
|
||
|
|
|
||
|
|
return totalMinutes > 0 ? sum / totalMinutes : 0.0;
|
||
|
|
}
|
||
|
|
|
||
|
|
private static class Record {
|
||
|
|
long timestamp;
|
||
|
|
double value;
|
||
|
|
|
||
|
|
Record(long timestamp, double value) {
|
||
|
|
this.timestamp = timestamp;
|
||
|
|
this.value = value;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|