diff --git a/src/main/java/com/dashboard/aws/lambda/handler/AccumulateIncrementHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/AccumulateIncrementHandler.java index 2d347b4..bca7d65 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/AccumulateIncrementHandler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/AccumulateIncrementHandler.java @@ -24,20 +24,15 @@ public class AccumulateIncrementHandler implements RequestHandler event, Context context) { try { ZoneId zone = ZoneId.of("Asia/Tokyo"); - LocalDateTime now = DateUtil.resolveEventTime(event, zone); + LocalDateTime now = DateUtil.normalizeToHalfHourTime(event, zone); List levels = DateUtil.getAggregationLevels(now); logger.info("Aggregation triggered at: {}, levels: {}", now, levels); // 从数据库或配置中取企业列表 - List companyIds = mysqlService.getActiveCompanyIds(); - logger.info("company id list: {}", companyIds); - - for (Long companyId : companyIds) { - for (AggregationLevel level : levels) { - logger.info("Start aggregation [company:{}, level:{}]", companyId, level); - accumulateIncrementService.aggregate(companyId, level, now); - } + for (AggregationLevel level : levels) { + logger.info("Start aggregation [level:{}]", level); + accumulateIncrementService.aggregate(level, now); } return "Aggregation finished at: " + now; diff --git a/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java index 737c5ac..45c0a98 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/MeasureAverageHandler.java @@ -22,17 +22,11 @@ public class MeasureAverageHandler implements RequestHandler @Override public String handleRequest(Map event, Context context) { try { - // 从数据库或配置中取企业列表 - List companyIds = mysqlService.getActiveCompanyIds(); - logger.info("company id list: {}", companyIds); - ZoneId zone = ZoneId.of("Asia/Tokyo"); - LocalDateTime now = DateUtil.resolveEventTime(event, zone); + LocalDateTime now = DateUtil.normalizeToHalfHourTime(event, zone); LocalDateTime start = now.minusHours(1); - for (Long companyId : companyIds) { - measureAverageService.aggregateLastHour(companyId, zone, now, start); - } + measureAverageService.aggregateLastHour(zone, now, start); return "Measure average finished"; diff --git a/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java b/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java index 16d02e9..0309fb3 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/MySQLToS3Handler.java @@ -18,6 +18,9 @@ public class MySQLToS3Handler implements RequestHandler, Str private static final Logger logger = LoggerFactory.getLogger(MySQLToS3Handler.class); + private static final String DB_SCHEMA = System.getenv("DB_SCHEMA"); + + private final MySQLService mysqlService = new MySQLService(); private final S3Service s3Service = new S3Service(); @@ -32,57 +35,51 @@ public class MySQLToS3Handler implements RequestHandler, Str long startTs = startDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli(); long endTs = endDate.atStartOfDay(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli(); - // 查询符合条件的企业ID - List companyIds = mysqlService.getActiveCompanyIds(); - logger.info("company id list: {}", companyIds); - - for (Long companyId : companyIds) { - String schema = "data_center_dongjian_" + companyId; + String schema = DB_SCHEMA; - for (String table : Constants.tables) { - // 查询旧数据 - List> rows = mysqlService.queryOldData(schema, table, startTs, endTs); - if (rows.isEmpty()) { - logger.info("[{}][{}] no data....", schema, table); - continue; - } + for (String table : Constants.tables) { + // 查询旧数据 + List> rows = mysqlService.queryOldData(schema, table, startTs, endTs); + if (rows.isEmpty()) { + logger.info("[{}][{}] no data....", schema, table); + continue; + } - // 按年月日分组 - Map>> grouped = new HashMap<>(); - for (Map row : rows) { - int y = (int) row.get("date_year"); - int m = (int) row.get("date_month"); - int d = (int) row.get("date_day"); - // 格式化年月日,不足两位补0 - String monthStr = String.format("%02d", m); - String dayStr = String.format("%02d", d); - String key = String.format("%d-%s-%s", y, monthStr, dayStr); - - grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(row); - } + // 按年月日分组 + Map>> grouped = new HashMap<>(); + for (Map row : rows) { + int y = (int) row.get("date_year"); + int m = (int) row.get("date_month"); + int d = (int) row.get("date_day"); + // 格式化年月日,不足两位补0 + String monthStr = String.format("%02d", m); + String dayStr = String.format("%02d", d); + String key = String.format("%d-%s-%s", y, monthStr, dayStr); + + grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(row); + } - // 上传每个分组 - boolean allSuccess = true; - for (Map.Entry>> entry : grouped.entrySet()) { - String dateKey = entry.getKey(); - byte[] csvBytes = CsvUtil.toCsvBytes(entry.getValue()); - String s3Key = String.format("%s/%s/%s.csv", table, companyId, dateKey); - try { - s3Service.upload(s3Key, csvBytes); - logger.info("[{}][{}] backup success -> {}", schema, table, s3Key); - } catch (Exception e) { - allSuccess = false; - logger.error("[{}][{}] backup failed -> {}", schema, table, s3Key, e); - } + // 上传每个分组 + boolean allSuccess = true; + for (Map.Entry>> entry : grouped.entrySet()) { + String dateKey = entry.getKey(); + byte[] csvBytes = CsvUtil.toCsvBytes(entry.getValue()); + String s3Key = String.format("%s/%s.csv", table, dateKey); + try { + s3Service.upload(s3Key, csvBytes); + logger.info("[{}][{}] backup success -> {}", schema, table, s3Key); + } catch (Exception e) { + allSuccess = false; + logger.error("[{}][{}] backup failed -> {}", schema, table, s3Key, e); } + } - // 全部上传成功才删除数据 - if (allSuccess) { - mysqlService.deleteOldData(schema, table, startTs, endTs); - logger.info("[{}][{}] delete old data", schema, table); - } else { - logger.error("[{}][{}] not all files uploaded successfully, skip delete", schema, table); - } + // 全部上传成功才删除数据 + if (allSuccess) { + mysqlService.deleteOldData(schema, table, startTs, endTs); + logger.info("[{}][{}] delete old data", schema, table); + } else { + logger.error("[{}][{}] not all files uploaded successfully, skip delete", schema, table); } } diff --git a/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java index f76e42c..7920f63 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java @@ -18,6 +18,9 @@ public class S3ToMySQLHandler implements RequestHandler, Str private static final Logger logger = LoggerFactory.getLogger(S3ToMySQLHandler.class); + private static final String DB_SCHEMA = System.getenv("DB_SCHEMA"); + + private final MySQLService mysqlService = new MySQLService(); private final S3Service s3Service = new S3Service(); @@ -35,45 +38,39 @@ public class S3ToMySQLHandler implements RequestHandler, Str // --- 解析出 fileDate(用于删除早于该日期的数据) --- LocalDate fileDate = LocalDate.parse(dateStr); - // 查询符合条件的企业ID - List companyIds = mysqlService.getActiveCompanyIds(); - logger.info("company id list: {}", companyIds); - List tables = Constants.tables; - for (Long companyId : companyIds) { - String schema = "data_center_dongjian_" + companyId; - - for (String table : tables) { - //先删除旧数据 - int deleted = mysqlService.deleteBeforeDate(schema, table, fileDate); - logger.info("[{}][{}] deleted {} records before {}", schema, table, deleted, fileDate); - - // 构造 S3 文件路径 - String s3Key = String.format("%s/%s/%s.csv", table, companyId, dateStr); - logger.info("processing s3 file: {}", s3Key); - - // 下载 CSV - byte[] content = s3Service.download(s3Key); - if (content == null || content.length == 0) { - logger.warn("[{}][{}] file empty or not found: {}", schema, table, s3Key); - continue; - } - - // 解析 CSV - List> rows = CsvUtil.parseCsv(content); - if (rows.isEmpty()) { - logger.warn("[{}][{}] csv empty: {}", schema, table, s3Key); - continue; - } - - // 插入到 MySQL - try { - int inserted = mysqlService.insertRows(schema, table, rows); - logger.info("[{}][{}] inserted {} rows", schema, table, inserted); - } catch (Exception e) { - logger.error("[{}][{}] insert failed -> {}", schema, table, s3Key, e); - } + String schema = DB_SCHEMA; + + for (String table : tables) { + //先删除旧数据 + int deleted = mysqlService.deleteBeforeDate(schema, table, fileDate); + logger.info("[{}][{}] deleted {} records before {}", schema, table, deleted, fileDate); + + // 构造 S3 文件路径 + String s3Key = String.format("%s/%s.csv", table, dateStr); + logger.info("processing s3 file: {}", s3Key); + + // 下载 CSV + byte[] content = s3Service.download(s3Key); + if (content == null || content.length == 0) { + logger.warn("[{}][{}] file empty or not found: {}", schema, table, s3Key); + continue; + } + + // 解析 CSV + List> rows = CsvUtil.parseCsv(content); + if (rows.isEmpty()) { + logger.warn("[{}][{}] csv empty: {}", schema, table, s3Key); + continue; + } + + // 插入到 MySQL + try { + int inserted = mysqlService.insertRows(schema, table, rows); + logger.info("[{}][{}] inserted {} rows", schema, table, inserted); + } catch (Exception e) { + logger.error("[{}][{}] insert failed -> {}", schema, table, s3Key, e); } } diff --git a/src/main/java/com/dashboard/aws/lambda/service/AccumulateIncrementService.java b/src/main/java/com/dashboard/aws/lambda/service/AccumulateIncrementService.java index 0f930dd..1723de5 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/AccumulateIncrementService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/AccumulateIncrementService.java @@ -17,12 +17,13 @@ public class AccumulateIncrementService { private static final String MYSQL_URL = System.getenv("DB_URL"); private static final String DB_USER = System.getenv("DB_USER"); private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); + private static final String DB_SCHEMA = System.getenv("DB_SCHEMA"); private Connection getConnection() throws SQLException { return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); } - public void aggregate(Long companyId, AggregationLevel level, LocalDateTime now) { + public void aggregate(AggregationLevel level, LocalDateTime now) { String sourceTable; String targetTable; String sourceColumn; @@ -141,7 +142,7 @@ public class AccumulateIncrementService { } } - String schema = "data_center_dongjian_" + companyId; + String schema = DB_SCHEMA; // --- 2) SQL:用占位符传入 date 字段 + start/end 时间戳(毫秒) --- String sql = String.format( @@ -174,9 +175,9 @@ public class AccumulateIncrementService { ps.setLong(idx, endMillis); int inserted = ps.executeUpdate(); - logger.info("[company:{}][{}] aggregated {} -> rows inserted: {}, start={}, end={}", companyId, level, targetTable, inserted, startMillis, endMillis); + logger.info("[{}] aggregated {} -> rows inserted: {}, start={}, end={}", level, targetTable, inserted, startMillis, endMillis); } catch (SQLException e) { - logger.error("[company:{}][{}] aggregation failed, start={}, end={}", companyId, level, startMillis, endMillis, e); + logger.error("[{}] aggregation failed, start={}, end={}", level, startMillis, endMillis, e); } } } diff --git a/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java b/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java index 2461629..6d60293 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/MeasureAverageService.java @@ -14,18 +14,19 @@ public class MeasureAverageService { private static final String MYSQL_URL = System.getenv("DB_URL"); private static final String DB_USER = System.getenv("DB_USER"); private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); + private static final String DB_SCHEMA = System.getenv("DB_SCHEMA"); private Connection getConnection() throws SQLException { return DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); } - public void aggregateLastHour(Long companyId, ZoneId zone, LocalDateTime now, LocalDateTime start) { + public void aggregateLastHour(ZoneId zone, LocalDateTime now, LocalDateTime start) { long startTs = start.atZone(zone).toInstant().toEpochMilli(); long endTs = now.atZone(zone).toInstant().toEpochMilli(); - logger.info("Aggregating data for company {} from {} to {}", companyId, start, now); + logger.info("Aggregating data from {} to {}", start, now); - String schema = "data_center_dongjian_" + companyId; + String schema = DB_SCHEMA; String realtimeTable = "dashboard_realtime_measure"; String sourceTable = "dashboard_record_measure"; String targetTable = "dashboard_aggregate_measure_hour"; @@ -110,13 +111,13 @@ public class MeasureAverageService { // 判断是否到达每日 / 每月 / 每年聚合时间点 if (now.getHour() == 0) { - aggregateLastDay(companyId, now, now.toLocalDate().minusDays(1)); // 昨天的聚合 + aggregateLastDay(now, now.toLocalDate().minusDays(1)); // 昨天的聚合 } if (now.getDayOfMonth() == 1 && now.getHour() == 0) { - aggregateLastMonth(companyId, now, now.toLocalDate().minusMonths(1)); // 上个月的聚合 + aggregateLastMonth(now, now.toLocalDate().minusMonths(1)); // 上个月的聚合 } if (now.getMonthValue() == 1 && now.getDayOfMonth() == 1 && now.getHour() == 0) { - aggregateLastYear(companyId, now, now.toLocalDate().minusYears(1)); // 上一年的聚合 + aggregateLastYear(now, now.toLocalDate().minusYears(1)); // 上一年的聚合 } } catch (SQLException e) { @@ -124,12 +125,12 @@ public class MeasureAverageService { } } - public void aggregateLastDay(Long companyId, LocalDateTime now, LocalDate targetDate) { - String schema = "data_center_dongjian_" + companyId; + public void aggregateLastDay(LocalDateTime now, LocalDate targetDate) { + String schema = DB_SCHEMA; String sourceTable = "dashboard_aggregate_measure_hour"; String targetTable = "dashboard_aggregate_measure_day"; - logger.info("Aggregating daily data for company: {}, date {}", companyId, targetDate); + logger.info("Aggregating daily data, date {}", targetDate); String sql = String.format( "SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / 24.0) AS avg_val, " + @@ -178,12 +179,12 @@ public class MeasureAverageService { } } - public void aggregateLastMonth(Long companyId, LocalDateTime now, LocalDate targetMonth) { - String schema = "data_center_dongjian_" + companyId; + public void aggregateLastMonth(LocalDateTime now, LocalDate targetMonth) { + String schema = DB_SCHEMA; String sourceTable = "dashboard_aggregate_measure_day"; String targetTable = "dashboard_aggregate_measure_month"; - logger.info("Aggregating monthly data for company:{}, month: {}", companyId, targetMonth.getMonthValue()); + logger.info("Aggregating monthly data, month: {}", targetMonth.getMonthValue()); // 天数作为分母 int daysInMonth = YearMonth.of(targetMonth.getYear(), targetMonth.getMonthValue()).lengthOfMonth(); @@ -234,12 +235,12 @@ public class MeasureAverageService { } } - public void aggregateLastYear(Long companyId, LocalDateTime now, LocalDate targetYear) { - String schema = "data_center_dongjian_" + companyId; + public void aggregateLastYear(LocalDateTime now, LocalDate targetYear) { + String schema = DB_SCHEMA; String sourceTable = "dashboard_aggregate_measure_month"; String targetTable = "dashboard_aggregate_measure_year"; - logger.info("Aggregating yearly data for company:{}, year: {}", companyId, targetYear.getYear()); + logger.info("Aggregating yearly data, year: {}", targetYear.getYear()); String sql = String.format( "SELECT device_id, (SUM(COALESCE(CAST(NULLIF(average_value, '') AS DECIMAL(20,6)), 0)) / 12.0) AS avg_val," + diff --git a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java index 0ffbf4e..dfe9e2e 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java @@ -19,24 +19,24 @@ public class MySQLService { /** * 获取符合条件的企业ID */ - public List getActiveCompanyIds() throws SQLException { - String sql = """ - SELECT id FROM data_center_new.basic_company - WHERE flag != 1 - AND (parent_id = -1 OR parent_id = 1) - """; - - try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); - PreparedStatement ps = conn.prepareStatement(sql); - ResultSet rs = ps.executeQuery()) { - - List ids = new ArrayList<>(); - while (rs.next()) { - ids.add(rs.getLong("id")); - } - return ids; - } - } +// public List getActiveCompanyIds() throws SQLException { +// String sql = """ +// SELECT id FROM data_center_new.basic_company +// WHERE flag != 1 +// AND (parent_id = -1 OR parent_id = 1) +// """; +// +// try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); +// PreparedStatement ps = conn.prepareStatement(sql); +// ResultSet rs = ps.executeQuery()) { +// +// List ids = new ArrayList<>(); +// while (rs.next()) { +// ids.add(rs.getLong("id")); +// } +// return ids; +// } +// } public List> queryOldData(String schema, String table, long startTs, long endTs) throws SQLException { String sql = String.format("SELECT * FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table); diff --git a/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java b/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java index 6ccb5f0..cac8431 100644 --- a/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java +++ b/src/main/java/com/dashboard/aws/lambda/util/DateUtil.java @@ -109,11 +109,8 @@ public class DateUtil { } */ - public static LocalDateTime resolveEventTime(Map event, ZoneId zone) { - LocalDateTime now = LocalDateTime.now(zone) - .withMinute(0) - .withSecond(0) - .withNano(0); + public static LocalDateTime normalizeToHalfHourTime(Map event, ZoneId zone) { + LocalDateTime now = LocalDateTime.now(zone); if (MapUtil.isNotEmpty(event)) { Integer year = MapUtil.getInt(event, "year"); @@ -137,7 +134,11 @@ public class DateUtil { logger.info("Using default Tokyo time (missing year/month/day): {}", now); } } else { - logger.info("Using default Tokyo time (no event provided): {}", now); + // 根据当前分钟数判断是落在 0 还是 30 分段 + int currentMinute = now.getMinute(); + int roundedMinute = (currentMinute >= 30) ? 30 : 0; + now = now.withMinute(roundedMinute).withSecond(0).withNano(0); + logger.info("Using default Tokyo time (rounded to {}): {}", roundedMinute, now); } return now;