From dd76466c344462f4607784f46fcb3051331d444c Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Mon, 13 Oct 2025 00:50:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=8F=E6=97=B6=E5=B9=B3=E5=9D=87=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lambda/handler/MeasureAverageHandler.java | 43 +++++ .../lambda/service/AggregationService.java | 18 +- .../lambda/service/MeasureAverageService.java | 177 ++++++++++++++++++ 3 files changed, 229 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java create mode 100644 src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java diff --git a/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java new file mode 100644 index 0000000..9daf5f0 --- /dev/null +++ b/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java @@ -0,0 +1,43 @@ +package com.dashboard.aws.lambda.handler; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.dashboard.aws.lambda.service.MeasureAverageService; +import com.dashboard.aws.lambda.service.MySQLService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Map; + +public class MeasureAverageHandler implements RequestHandler, String> { + + private static final Logger logger = LoggerFactory.getLogger(MeasureAverageHandler.class); + private final MeasureAverageService measureAverageService = new MeasureAverageService(); + private final MySQLService mysqlService = new MySQLService(); + + @Override + public String handleRequest(Map event, Context context) { + try { + // 从数据库或配置中取企业列表 + List companyIds = mysqlService.getActiveCompanyIds(); + logger.info("company id list: {}", companyIds); + + ZoneId zone = ZoneId.of("Asia/Tokyo"); + LocalDateTime now = LocalDateTime.now(zone).withMinute(0).withSecond(0).withNano(0); + LocalDateTime start = now.minusHours(1); + + for (Long companyId : companyIds) { + measureAverageService.aggregateLastHour(companyId, zone, now, start); + } + + return "Measure average finished"; + + } catch (Exception e) { + logger.error("MeasureAverage error", e); + return "MeasureAverage error:" + e.getMessage(); + } + } +} diff --git a/src/main/java/com/dashboard/aws/lambda/service/AggregationService.java b/src/main/java/com/dashboard/aws/lambda/service/AggregationService.java index 5afa054..75497be 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/AggregationService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/AggregationService.java @@ -31,31 +31,31 @@ public class AggregationService { switch (level) { case MIN_30 -> { sourceTable = "dashboard_record_accumulate"; - targetTable = "dashboard_aggregate_30min"; + targetTable = "dashboard_aggregate_accumulate_30min"; sourceColumn = "increment_minute"; timeWhereColumn = "upload_at"; } case HOUR -> { - sourceTable = "dashboard_aggregate_30min"; - targetTable = "dashboard_aggregate_hour"; + sourceTable = "dashboard_aggregate_accumulate_30min"; + targetTable = "dashboard_aggregate_accumulate_hour"; sourceColumn = "aggregate_value"; timeWhereColumn = "time_start"; } case DAY -> { - sourceTable = "dashboard_aggregate_hour"; - targetTable = "dashboard_aggregate_day"; + sourceTable = "dashboard_aggregate_accumulate_hour"; + targetTable = "dashboard_aggregate_accumulate_day"; sourceColumn = "aggregate_value"; timeWhereColumn = "time_start"; } case MONTH -> { - sourceTable = "dashboard_aggregate_day"; - targetTable = "dashboard_aggregate_month"; + sourceTable = "dashboard_aggregate_accumulate_day"; + targetTable = "dashboard_aggregate_accumulate_month"; sourceColumn = "aggregate_value"; timeWhereColumn = "time_start"; } case YEAR -> { - sourceTable = "dashboard_aggregate_month"; - targetTable = "dashboard_aggregate_year"; + sourceTable = "dashboard_aggregate_accumulate_month"; + targetTable = "dashboard_aggregate_accumulate_year"; sourceColumn = "aggregate_value"; timeWhereColumn = "time_start"; } diff --git a/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java b/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java new file mode 100644 index 0000000..d498c2e --- /dev/null +++ b/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java @@ -0,0 +1,177 @@ +package com.dashboard.aws.lambda.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.time.*; +import java.util.*; + +public class MeasureAverageService { + + private static final Logger logger = LoggerFactory.getLogger(MeasureAverageService.class); + + private static final String MYSQL_URL = System.getenv("DB_URL"); + private static final String DB_USER = System.getenv("DB_USER"); + private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); + + private Connection getConnection() throws SQLException { + return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); + } + + public void aggregateLastHour(Long companyId, ZoneId zone, LocalDateTime now, LocalDateTime start) { + long startTs = start.atZone(zone).toInstant().toEpochMilli(); + long endTs = now.atZone(zone).toInstant().toEpochMilli(); + + logger.info("Aggregating data for company {} from {} to {}", companyId, start, now); + + String schema = "data_center_dongjian_" + companyId; + String realtimeTable = "dashboard_realtime_measure"; + String sourceTable = "dashboard_record_measure"; + String targetTable = "dashboard_aggregate_measure_hour"; + + try (Connection conn = getConnection()) { + + // 1: 从实时表获取所有上报过的设备ID集合 + Set deviceSet = new HashSet<>(); + String realtimeSql = String.format("SELECT device_id FROM %s.%s", schema, realtimeTable); + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(realtimeSql)) { + while (rs.next()) { + deviceSet.add(rs.getString("device_id")); + } + } + logger.info("Found {} devices from realtime table", deviceSet.size()); + + // 2: 查询过去一小时的数据 + Map> currentHourData = queryRecordData(conn, schema, sourceTable, startTs, endTs); + + // 3: 查询上一小时的聚合结果(用于回退) + LocalDateTime prevHour = start.minusHours(1); + Map prevHourData = queryPrevHourAggregates(conn, schema, targetTable, prevHour); + + // 4: 计算当前小时平均值(如果有) + Map resultMap = new HashMap<>(); + for (Map.Entry> entry : currentHourData.entrySet()) { + String deviceId = entry.getKey(); + double avg = computeTimeWeightedAverage(entry.getValue(), startTs, endTs); + resultMap.put(deviceId, avg); + } + + // 5: 对比集合 A,确保所有设备都有值(否则用上一小时或 0) + for (String deviceId : deviceSet) { + if (!resultMap.containsKey(deviceId)) { + Double prevVal = prevHourData.get(deviceId); + double value = (prevVal != null) ? prevVal : 0.0; + resultMap.put(deviceId, value); + } + } + + // 6: 写入本小时聚合表 + String insertSql = String.format( + "INSERT INTO %s.%s (device_id, average_value, date_year, date_month, date_day, date_hour, " + + "time_start, time_end, aggregated_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", schema, targetTable + ); + + try (PreparedStatement ps = conn.prepareStatement(insertSql)) { + for (Map.Entry entry : resultMap.entrySet()) { + String deviceId = entry.getKey(); + double avg = entry.getValue(); + LocalDateTime dt = start; + + ps.setString(1, deviceId); + ps.setString(2, String.format("%.6f", avg)); + ps.setInt(3, dt.getYear()); + ps.setInt(4, dt.getMonthValue()); + ps.setInt(5, dt.getDayOfMonth()); + ps.setInt(6, dt.getHour()); + ps.setLong(7, startTs); + ps.setLong(8, endTs); + ps.setTimestamp(9, Timestamp.valueOf(now)); + + ps.addBatch(); + } + ps.executeBatch(); + } + + logger.info("Hourly aggregation finished for {} devices", resultMap.size()); + + } catch (SQLException e) { + logger.error("Hourly aggregation failed", e); + } + } + + // 查询当前小时数据 + private Map> queryRecordData(Connection conn, String schema, String table, long startTs, long endTs) throws SQLException { + String sql = String.format( + "SELECT device_id, upload_value, upload_at FROM %s.%s " + + "WHERE upload_at >= ? AND upload_at < ? ORDER BY device_id, upload_at", + schema, table + ); + Map> map = new HashMap<>(); + try (PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setLong(1, startTs); + ps.setLong(2, endTs); + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + String deviceId = rs.getString("device_id"); + String valStr = rs.getString("upload_value"); + long ts = rs.getLong("upload_at"); + double val = (valStr == null || valStr.isEmpty()) ? 0.0 : Double.parseDouble(valStr); + map.computeIfAbsent(deviceId, k -> new ArrayList<>()).add(new Record(ts, val)); + } + } + return map; + } + + // 查询上一小时聚合结果 + private Map queryPrevHourAggregates(Connection conn, String schema, String table, LocalDateTime prevHour) throws SQLException { + Map map = new HashMap<>(); + String sql = String.format( + "SELECT device_id, average_value FROM %s.%s " + + "WHERE date_year = ? AND date_month = ? AND date_day = ? AND date_hour = ?", + schema, table + ); + try (PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setInt(1, prevHour.getYear()); + ps.setInt(2, prevHour.getMonthValue()); + ps.setInt(3, prevHour.getDayOfMonth()); + ps.setInt(4, prevHour.getHour()); + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + map.put(rs.getString("device_id"), rs.getDouble("average_value")); + } + } + return map; + } + + private double computeTimeWeightedAverage(List records, long startTs, long endTs) { + if (records == null || records.isEmpty()) return 0.0; + + double sum = 0.0; + double totalMinutes = (endTs - startTs) / 60000.0; // 毫秒 → 分钟 + + for (int i = 0; i < records.size(); i++) { + Record rec = records.get(i); + long nextTs = (i < records.size() - 1) ? records.get(i + 1).timestamp : endTs; + double intervalMinutes = (nextTs - rec.timestamp) / 60000.0; // 转分钟 + logger.info("intervalMinutes: {}, rec.value: {}", intervalMinutes, rec.value); + sum += rec.value * intervalMinutes; + } + + logger.info("totalMinutes: {}, sum: {}", totalMinutes, sum); + + return totalMinutes > 0 ? sum / totalMinutes : 0.0; + } + + private static class Record { + long timestamp; + double value; + + Record(long timestamp, double value) { + this.timestamp = timestamp; + this.value = value; + } + } +} \ No newline at end of file