From e4013d26586c1eb08e73a338eda5ef4c48938ed8 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Thu, 21 May 2026 20:00:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E7=BB=88=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lambda/handler/MySQLBatchToS3Handler.java | 73 +++++++------ .../aws/lambda/handler/S3ToMySQLHandler.java | 84 --------------- .../aws/lambda/service/MySQLService.java | 102 +++++++++++++----- 3 files changed, 115 insertions(+), 144 deletions(-) delete mode 100644 src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java diff --git a/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java b/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java index 013822e..1a616ef 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java @@ -104,7 +104,7 @@ public class MySQLBatchToS3Handler implements RequestHandler // 先处理失败记录 processFailedRecords(failedRecords, table); - // 保存失败记录(处理完一个表就保存一次 + // 保存失败记录(处理完一个表就保存一次) saveFailedRecords(failedRecords); // 再处理新数据 @@ -129,13 +129,33 @@ public class MySQLBatchToS3Handler implements RequestHandler * 循环查询批次数据,直到没有数据或遇到截止日期或超时 */ private void processTable(String table, long cutoffUploadAt, long overallStartTime, FailedRecords failedRecords) { - logger.info("[{}][{}] start processing batch size: {}, cutoff upload_at: {}", DB_SCHEMA, table, DB_2_S3_BATCH_SIZE, cutoffUploadAt); + // 计算开始时间:往前10天 + long startTime = cutoffUploadAt - 10 * 24 * 60 * 60 * 1000L; + + logger.info("[{}][{}] start processing batch size: {}, time range: [{}, {})", + DB_SCHEMA, table, DB_2_S3_BATCH_SIZE, startTime, cutoffUploadAt); long tableStartTime = System.currentTimeMillis(); - Long lastProcessedId = null; + // 先查时间范围内的 minId 和 maxId + long minId, maxId; + try { + long[] minMaxId = mysqlService.queryMinMaxId(DB_SCHEMA, table, startTime, cutoffUploadAt); + minId = minMaxId[0]; + maxId = minMaxId[1]; + if (minId == 0 && maxId == 0) { + logger.info("[{}][{}] no data found, skip", DB_SCHEMA, table); + return; + } + } catch (Exception e) { + logger.error("[{}][{}] query min max id failed", DB_SCHEMA, table, e); + return; + } + + // 用主键单推法处理 + long currentId = minId; boolean shouldContinue = true; - while (shouldContinue) { + while (shouldContinue && currentId <= maxId) { // 检查是否快超时:距离15分钟还剩小于2分钟时停止,留点余量时间,防止lambda超时 long elapsedTime = System.currentTimeMillis() - overallStartTime; long remainingTime = 15 * 60 * 1000 - elapsedTime; @@ -148,7 +168,8 @@ public class MySQLBatchToS3Handler implements RequestHandler // 查询当前批次数据 List> rows = null; try { - rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, DB_2_S3_BATCH_SIZE, lastProcessedId); + // 这里有个小问题,由于id和upload_at不是同步递增,id小的upload_at反而大,那么会查出cutoffUploadAt后面的数据,但这个不影响,就当提前存了一部分 + rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, currentId, maxId, DB_2_S3_BATCH_SIZE); } catch (Exception e) { logger.error("[{}][{}] query failed", DB_SCHEMA, table, e); break; @@ -161,20 +182,11 @@ public class MySQLBatchToS3Handler implements RequestHandler logger.info("[{}][{}] fetched {} rows", DB_SCHEMA, table, rows.size()); - // 过滤数据并按日期分组:只处理upload_at < cutoffUploadAt的记录 + // 按日期分组 Map>> grouped = new HashMap<>(); Long maxIdInBatch = null; - boolean reachedCutoff = false; - int validCount = 0; for (Map row : rows) { - Long uploadAt = (Long) row.get("upload_at"); - if (uploadAt >= cutoffUploadAt) { - logger.info("[{}][{}] reached cutoff upload_at: {}, stop processing", DB_SCHEMA, table, uploadAt); - reachedCutoff = true; - break; - } - // 更新当前批次的最大ID Long id = (Long) row.get("id"); if (maxIdInBatch == null || id > maxIdInBatch) { @@ -182,26 +194,22 @@ public class MySQLBatchToS3Handler implements RequestHandler } // 根据upload_at转换为东京时区日期 + Long uploadAt = (Long) row.get("upload_at"); LocalDate date = Instant.ofEpochMilli(uploadAt).atZone(TOKYO_ZONE).toLocalDate(); // 按日期分组 String dateKey = date.format(DATE_FORMATTER); grouped.computeIfAbsent(dateKey, k -> new ArrayList<>()).add(row); - validCount++; - } - - if (validCount == 0) { - logger.info("[{}][{}] no valid rows in this batch", DB_SCHEMA, table); - break; } // 如果当前批次没有有效数据,继续下一批 if (grouped.isEmpty()) { logger.info("[{}][{}] no valid data in this batch", DB_SCHEMA, table); - if (validCount < DB_2_S3_BATCH_SIZE || reachedCutoff) { + if (maxIdInBatch != null) { + currentId = maxIdInBatch + 1; + } + if (rows.size() < DB_2_S3_BATCH_SIZE) { shouldContinue = false; - } else { - lastProcessedId = maxIdInBatch; } continue; } @@ -234,7 +242,7 @@ public class MySQLBatchToS3Handler implements RequestHandler // 上传成功,立即删除该分组对应的ID List failedIds = mysqlService.deleteByIds(DB_SCHEMA, table, currentDateIds); - // 计算成功删除的 ID(总 ID - 失败 ID) + // 计算成功删除的ID(总ID - 失败ID) List successIds = new ArrayList<>(currentDateIds); successIds.removeAll(failedIds); @@ -243,7 +251,7 @@ public class MySQLBatchToS3Handler implements RequestHandler processedIds.addAll(successIds); } - // 记录失败的 ID + // 记录失败的ID if (!failedIds.isEmpty()) { logger.error("[{}][{}] delete failed for {} records in date {}, recording to failed records", DB_SCHEMA, table, failedIds.size(), dateKey); @@ -267,16 +275,13 @@ public class MySQLBatchToS3Handler implements RequestHandler logger.info("[{}][{}] date {} processed in {}ms", DB_SCHEMA, table, dateKey, groupTime); } - // 更新下次查询的起始ID:只要处理了数据,就更新到已处理的最大ID - if (!processedIds.isEmpty()) { - Long maxProcessedId = processedIds.stream().max(Long::compareTo).orElse(null); - if (maxProcessedId != null) { - lastProcessedId = maxProcessedId; - } + // 更新下次查询的起始ID + if (maxIdInBatch != null) { + currentId = maxIdInBatch + 1; } - // 确定是否继续:只有全部成功且当前批次是完整批次且没有达到cutoff时才继续 - if (anyFailure || validCount < DB_2_S3_BATCH_SIZE || reachedCutoff) { + // 确定是否继续:只有全部成功且当前批次是完整批次时才继续 + if (anyFailure || rows.size() < DB_2_S3_BATCH_SIZE) { shouldContinue = false; } } diff --git a/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java deleted file mode 100644 index 7920f63..0000000 --- a/src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.dashboard.aws.lambda.handler; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.dashboard.aws.lambda.Constants; -import com.dashboard.aws.lambda.service.MySQLService; -import com.dashboard.aws.lambda.service.S3Service; -import com.dashboard.aws.lambda.util.CsvUtil; -import com.dashboard.aws.lambda.util.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.LocalDate; -import java.time.ZoneId; -import java.util.*; - -public class S3ToMySQLHandler implements RequestHandler, String> { - - private static final Logger logger = LoggerFactory.getLogger(S3ToMySQLHandler.class); - - private static final String DB_SCHEMA = System.getenv("DB_SCHEMA"); - - - private final MySQLService mysqlService = new MySQLService(); - private final S3Service s3Service = new S3Service(); - - @Override - public String handleRequest(Map event, Context context) { - try { - // 同步目标日期(这里同步 3 天前的数据) -// LocalDate targetDate = LocalDate.now(ZoneId.of("Asia/Tokyo")).minusDays(3); -// String dateStr = targetDate.format(java.time.format.DateTimeFormatter.ISO_DATE); -// String dateStr = "2025-10-08"; - ZoneId zone = ZoneId.of("Asia/Tokyo"); - LocalDate now = DateUtil.resolveEventDate( event, zone); - String dateStr = DateUtil.getLastYearSameIsoWeekDayStr(now); - - // --- 解析出 fileDate(用于删除早于该日期的数据) --- - LocalDate fileDate = LocalDate.parse(dateStr); - - List tables = Constants.tables; - - String schema = DB_SCHEMA; - - for (String table : tables) { - //先删除旧数据 - int deleted = mysqlService.deleteBeforeDate(schema, table, fileDate); - logger.info("[{}][{}] deleted {} records before {}", schema, table, deleted, fileDate); - - // 构造 S3 文件路径 - String s3Key = String.format("%s/%s.csv", table, dateStr); - logger.info("processing s3 file: {}", s3Key); - - // 下载 CSV - byte[] content = s3Service.download(s3Key); - if (content == null || content.length == 0) { - logger.warn("[{}][{}] file empty or not found: {}", schema, table, s3Key); - continue; - } - - // 解析 CSV - List> rows = CsvUtil.parseCsv(content); - if (rows.isEmpty()) { - logger.warn("[{}][{}] csv empty: {}", schema, table, s3Key); - continue; - } - - // 插入到 MySQL - try { - int inserted = mysqlService.insertRows(schema, table, rows); - logger.info("[{}][{}] inserted {} rows", schema, table, inserted); - } catch (Exception e) { - logger.error("[{}][{}] insert failed -> {}", schema, table, s3Key, e); - } - } - - return "sync process finished"; - - } catch (Exception e) { - logger.error("sync error", e); - return "sync error:" + e.getMessage(); - } - } -} diff --git a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java index 486f3fc..bf17486 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java @@ -205,20 +205,47 @@ public class MySQLService { .toEpochMilli(); String sql = String.format( - "DELETE FROM %s.%s WHERE upload_at < ?", + "DELETE FROM %s.%s WHERE upload_at < ? LIMIT ?", schema, table ); logger.info("deleteBeforeDate sql: {}, cutoffUploadAt:{}", sql, cutoffUploadAt); - try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); - PreparedStatement ps = conn.prepareStatement(sql)) { - ps.setLong(1, cutoffUploadAt); - int deleted = ps.executeUpdate(); - logger.info("deleteBeforeDate: schema={}, table={}, cutoffUploadAt={}, deleted={}", schema, table, cutoffUploadAt, deleted); - return deleted; + int batchSize = 5000; // 每批删除 5000 条 + int totalDeleted = 0; + + try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD)) { + while (true) { + try (PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setLong(1, cutoffUploadAt); + ps.setInt(2, batchSize); + int deleted = ps.executeUpdate(); + + if (deleted == 0) { + logger.info("deleteBeforeDate: no more records to delete"); + break; // 没有更多数据了 + } + + totalDeleted += deleted; + logger.info("deleteBeforeDate: schema={}, table={}, cutoffUploadAt={}, batch deleted={}, total deleted={}", + schema, table, cutoffUploadAt, deleted, totalDeleted); + } + + // 每批之间 sleep 50ms,减少锁冲突 + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + + logger.info("deleteBeforeDate: schema={}, table={}, cutoffUploadAt={}, total deleted={}", + schema, table, cutoffUploadAt, totalDeleted); + return totalDeleted; + } catch (SQLException e) { logger.error("deleteBeforeDate failed for {}/{} cutoffUploadAt={}", schema, table, cutoffUploadAt, e); - return 0; + return totalDeleted; // 出错时返回已删除的数量 } } @@ -281,30 +308,53 @@ public class MySQLService { } /** - * 查询ID大于指定值的前N条数据(按 id 升序) - * 用于分页分批处理数据 - * @param lastId 上次处理的最大ID(不包含),为null时从第一条开始查询 + * 查询时间范围内的最小ID和最大ID + * @param startTime 开始时间(包含) + * @param endTime 结束时间(不包含) + * @return [minId, maxId],如果没有数据返回 [0, 0] */ - public List> queryFirstNByIdGreaterThan(String schema, String table, int limit, Long lastId) throws SQLException { - String sql; - if (lastId == null) { - // 第一次查询 - sql = String.format("SELECT * FROM %s.%s ORDER BY id ASC LIMIT ?", schema, table); - } else { - // 后续查询,从上次之后开始 - sql = String.format("SELECT * FROM %s.%s WHERE id > ? ORDER BY id ASC LIMIT ?", schema, table); - } - logger.info("queryFirstNByIdGreaterThan sql: {}, limit: {}, lastId: {}", - sql, limit, lastId); + public long[] queryMinMaxId(String schema, String table, long startTime, long endTime) throws SQLException { + String sql = String.format("SELECT MIN(id) AS min_id, MAX(id) AS max_id FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table); + logger.info("queryMinMaxId sql: {}, startTime: {}, endTime: {}", sql, startTime, endTime); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { - int paramIndex = 1; - if (lastId != null) { - ps.setLong(paramIndex++, lastId); + ps.setLong(1, startTime); + ps.setLong(2, endTime); + + ResultSet rs = ps.executeQuery(); + if (rs.next()) { + long minId = rs.getLong("min_id"); + long maxId = rs.getLong("max_id"); + if (rs.wasNull()) { + logger.info("[{}.{}] no data found in time range", schema, table); + return new long[]{0, 0}; + } + logger.info("[{}.{}] minId: {}, maxId: {}", schema, table, minId, maxId); + return new long[]{minId, maxId}; } - ps.setInt(paramIndex, limit); + return new long[]{0, 0}; + } + } + + /** + * 查询ID范围内的前N条数据(按 id ASC) + * 用于分页分批处理数据 + * @param startId 起始ID(包含) + * @param endId 结束ID(包含) + * @param limit 每批条数 + */ + public List> queryFirstNByIdGreaterThan(String schema, String table, long startId, long endId, int limit) throws SQLException { + String sql = String.format("SELECT * FROM %s.%s WHERE id >= ? AND id <= ? ORDER BY id ASC LIMIT ?", schema, table); + logger.info("queryFirstNByIdGreaterThan sql: {}, startId: {}, endId: {}, limit: {}", sql, startId, endId, limit); + + try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); + PreparedStatement ps = conn.prepareStatement(sql)) { + + ps.setLong(1, startId); + ps.setLong(2, endId); + ps.setInt(3, limit); ResultSet rs = ps.executeQuery(); ResultSetMetaData meta = rs.getMetaData();