package com.dashboard.aws.lambda.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.*; import java.time.*; import java.util.*; public class MeasureAverageService { private static final Logger logger = LoggerFactory.getLogger(MeasureAverageService.class); private static final String MYSQL_URL = System.getenv("DB_URL"); private static final String DB_USER = System.getenv("DB_USER"); private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); private Connection getConnection() throws SQLException { return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); } public void aggregateLastHour(Long companyId, ZoneId zone, LocalDateTime now, LocalDateTime start) { long startTs = start.atZone(zone).toInstant().toEpochMilli(); long endTs = now.atZone(zone).toInstant().toEpochMilli(); logger.info("Aggregating data for company {} from {} to {}", companyId, start, now); String schema = "data_center_dongjian_" + companyId; String realtimeTable = "dashboard_realtime_measure"; String sourceTable = "dashboard_record_measure"; String targetTable = "dashboard_aggregate_measure_hour"; try (Connection conn = getConnection()) { // 1: 从实时表获取所有上报过的设备ID集合 Set deviceSet = new HashSet<>(); String realtimeSql = String.format("SELECT device_id FROM %s.%s", schema, realtimeTable); try (Statement st = conn.createStatement(); ResultSet rs = st.executeQuery(realtimeSql)) { while (rs.next()) { deviceSet.add(rs.getString("device_id")); } } logger.info("Found {} devices from realtime table", deviceSet.size()); // 2: 查询过去一小时的数据 Map> currentHourData = queryRecordData(conn, schema, sourceTable, startTs, endTs); // 3: 查询上一小时的聚合结果(用于回退) LocalDateTime prevHour = start.minusHours(1); Map prevHourData = queryPrevHourAggregates(conn, schema, targetTable, prevHour); // 4: 计算当前小时平均、最大、最小值 Map resultMap = new HashMap<>(); for (Map.Entry> entry : currentHourData.entrySet()) { String deviceId = entry.getKey(); List records = entry.getValue(); double avg = computeTimeWeightedAverage(records, startTs, endTs); double max = computeMaxValue(records); double min = computeMinValue(records); resultMap.put(deviceId, new AggregateResult(avg, max, min)); } // 5: 对比集合 A,确保所有设备都有值(否则用上一小时或 0) for (String deviceId : deviceSet) { if (!resultMap.containsKey(deviceId)) { AggregateResult prev = prevHourData.get(deviceId); if (prev != null) { resultMap.put(deviceId, prev); } else { resultMap.put(deviceId, new AggregateResult(0.0, 0.0, 0.0)); } } } // 6: 写入本小时聚合表 String insertSql = String.format( "INSERT INTO %s.%s (device_id, average_value, max_value, min_value, " + "date_year, date_month, date_day, date_hour, time_start, time_end, aggregated_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", schema, targetTable ); try (PreparedStatement ps = conn.prepareStatement(insertSql)) { for (Map.Entry entry : resultMap.entrySet()) { String deviceId = entry.getKey(); AggregateResult r = entry.getValue(); LocalDateTime dt = start; ps.setString(1, deviceId); ps.setString(2, String.format("%.6f", r.avg)); ps.setString(3, String.format("%.6f", r.max)); ps.setString(4, String.format("%.6f", r.min)); ps.setInt(5, dt.getYear()); ps.setInt(6, dt.getMonthValue()); ps.setInt(7, dt.getDayOfMonth()); ps.setInt(8, dt.getHour()); ps.setLong(9, startTs); ps.setLong(10, endTs); ps.setTimestamp(11, Timestamp.valueOf(now)); ps.addBatch(); } ps.executeBatch(); } logger.info("Hourly aggregation finished for {} devices", resultMap.size()); } catch (SQLException e) { logger.error("Hourly aggregation failed", e); } } // 查询当前小时数据 private Map> queryRecordData(Connection conn, String schema, String table, long startTs, long endTs) throws SQLException { String sql = String.format( "SELECT device_id, upload_value, upload_at FROM %s.%s " + "WHERE upload_at >= ? AND upload_at < ? ORDER BY device_id, upload_at", schema, table ); Map> map = new HashMap<>(); try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setLong(1, startTs); ps.setLong(2, endTs); ResultSet rs = ps.executeQuery(); while (rs.next()) { String deviceId = rs.getString("device_id"); String valStr = rs.getString("upload_value"); long ts = rs.getLong("upload_at"); double val = (valStr == null || valStr.isEmpty()) ? 0.0 : Double.parseDouble(valStr); map.computeIfAbsent(deviceId, k -> new ArrayList<>()).add(new Record(ts, val)); } } return map; } // 查询上一小时聚合结果 private Map queryPrevHourAggregates(Connection conn, String schema, String table, LocalDateTime prevHour) throws SQLException { Map map = new HashMap<>(); String sql = String.format( "SELECT device_id, average_value, max_value, min_value FROM %s.%s " + "WHERE date_year = ? AND date_month = ? AND date_day = ? AND date_hour = ?", schema, table ); try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setInt(1, prevHour.getYear()); ps.setInt(2, prevHour.getMonthValue()); ps.setInt(3, prevHour.getDayOfMonth()); ps.setInt(4, prevHour.getHour()); ResultSet rs = ps.executeQuery(); while (rs.next()) { map.put(rs.getString("device_id"), new AggregateResult( rs.getDouble("average_value"), rs.getDouble("max_value"), rs.getDouble("min_value"))); } } return map; } // 时间加权平均 private double computeTimeWeightedAverage(List records, long startTs, long endTs) { if (records == null || records.isEmpty()) return 0.0; double sum = 0.0; double totalMinutes = (endTs - startTs) / 60000.0; // 毫秒 → 分钟 for (int i = 0; i < records.size(); i++) { Record rec = records.get(i); long nextTs = (i < records.size() - 1) ? records.get(i + 1).timestamp : endTs; double intervalMinutes = (nextTs - rec.timestamp) / 60000.0; // 转分钟 sum += rec.value * intervalMinutes; } return totalMinutes > 0 ? sum / totalMinutes : 0.0; } // 最大值 private double computeMaxValue(List records) { return records.stream().mapToDouble(r -> r.value).max().orElse(0.0); } // 最小值 private double computeMinValue(List records) { return records.stream().mapToDouble(r -> r.value).min().orElse(0.0); } private static class Record { long timestamp; double value; Record(long timestamp, double value) { this.timestamp = timestamp; this.value = value; } } private static class AggregateResult { double avg; double max; double min; AggregateResult(double avg, double max, double min) { this.avg = avg; this.max = max; this.min = min; } } }