diff --git a/pom.xml b/pom.xml index 32cac36..a5ac673 100644 --- a/pom.xml +++ b/pom.xml @@ -76,6 +76,13 @@ 1.2.12 + + tools.jackson.core + jackson-databind + 3.1.3 + compile + + diff --git a/src/main/java/com/dashboard/aws/lambda/Constants.java b/src/main/java/com/dashboard/aws/lambda/Constants.java index 686494d..293f648 100644 --- a/src/main/java/com/dashboard/aws/lambda/Constants.java +++ b/src/main/java/com/dashboard/aws/lambda/Constants.java @@ -7,7 +7,7 @@ public class Constants { public static final ZoneId TOKYO_ZONE = ZoneId.of("Asia/Tokyo"); -// public static final List tables = List.of("dashboard_record_accumulate", "dashboard_record_measure"); - public static final List tables = List.of("dashboard_record_accumulate_test", "dashboard_record_measure_test"); + public static final List tables = List.of("dashboard_record_accumulate", "dashboard_record_measure"); +// public static final List tables = List.of("dashboard_record_accumulate_test", "dashboard_record_measure_test"); } diff --git a/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java b/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java index ae6e979..b092dfa 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java @@ -20,26 +20,44 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import tools.jackson.databind.ObjectMapper; + public class MySQLBatchToS3Handler implements RequestHandler, String> { private static final Logger logger = LoggerFactory.getLogger(MySQLBatchToS3Handler.class); private static final String DB_SCHEMA = System.getenv("DB_SCHEMA"); - private static final int BATCH_SIZE = Integer.parseInt(System.getenv("BATCH_SIZE")); + private static final int INSERT_BATCH_SIZE = Integer.parseInt(System.getenv("INSERT_BATCH_SIZE")); private static final String CUTOFF_DATE_STR = System.getenv("CUTOFF_DATE"); private static final ZoneId TOKYO_ZONE = Constants.TOKYO_ZONE; private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - private static final Pattern FILE_PATTERN = Pattern.compile(".*/(\\d{3})\\.csv$"); + private static final Pattern FILE_PATTERN = Pattern.compile(".*/(\\d{8})\\.csv$"); private final MySQLService mysqlService = new MySQLService(); private final S3Service s3Service = new S3Service(); + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * 失败记录数据结构 + */ + static class FailedRecord { + public List ids; + public int retryCount; + } + + /** + * 失败记录集合 + */ + static class FailedRecords { + public Map tableRecords = new HashMap<>(); + } @Override public String handleRequest(Map event, Context context) { long startTime = System.currentTimeMillis(); try { - // 确定截止日期:优先使用环境变量,否则默认使用东京时区3天前 + // 确定截止日期:优先使用环境变量,否则默认使用东京时区2天前 LocalDate cutoffDate; if (CUTOFF_DATE_STR != null && !CUTOFF_DATE_STR.isEmpty()) { cutoffDate = LocalDate.parse(CUTOFF_DATE_STR); @@ -53,9 +71,23 @@ public class MySQLBatchToS3Handler implements RequestHandler long cutoffUploadAt = cutoffDate.atStartOfDay(TOKYO_ZONE).toInstant().toEpochMilli(); logger.info("Cutoff upload_at: {}", cutoffUploadAt); + // 加载失败记录 + FailedRecords failedRecords = loadFailedRecords(); + logger.info("Loaded failed records for {} tables", failedRecords.tableRecords.size()); + // 处理每个配置的表 for (String table : Constants.tables) { - processTable(table, cutoffUploadAt, startTime); + // 先处理失败记录 + processFailedRecords(failedRecords, table); + + // 保存失败记录(处理完一个表就保存一次 + saveFailedRecords(failedRecords); + + // 再处理新数据 + processTable(table, cutoffUploadAt, startTime, failedRecords); + + // 保存失败记录 + saveFailedRecords(failedRecords); } long totalTime = System.currentTimeMillis() - startTime; @@ -72,12 +104,11 @@ public class MySQLBatchToS3Handler implements RequestHandler * 处理单个表的数据备份 * 循环查询批次数据,直到没有数据或遇到截止日期或超时 */ - private void processTable(String table, long cutoffUploadAt, long overallStartTime) { - logger.info("[{}][{}] start processing batch size: {}", DB_SCHEMA, table, BATCH_SIZE); + private void processTable(String table, long cutoffUploadAt, long overallStartTime, FailedRecords failedRecords) { + logger.info("[{}][{}] start processing batch size: {}, cutoff upload_at: {}", DB_SCHEMA, table, INSERT_BATCH_SIZE, cutoffUploadAt); long tableStartTime = System.currentTimeMillis(); Long lastProcessedId = null; - Long lastUploadAt = null; boolean shouldContinue = true; while (shouldContinue) { @@ -93,7 +124,7 @@ public class MySQLBatchToS3Handler implements RequestHandler // 查询当前批次数据 List> rows = null; try { - rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, BATCH_SIZE, lastProcessedId, lastUploadAt, cutoffUploadAt); + rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, INSERT_BATCH_SIZE, lastProcessedId); } catch (Exception e) { logger.error("[{}][{}] query failed", DB_SCHEMA, table, e); break; @@ -106,22 +137,25 @@ public class MySQLBatchToS3Handler implements RequestHandler logger.info("[{}][{}] fetched {} rows", DB_SCHEMA, table, rows.size()); - // 按日期分组数据 + // 过滤数据并按日期分组:只处理upload_at < cutoffUploadAt的记录 Map>> grouped = new HashMap<>(); Long maxIdInBatch = null; - Long maxUploadAtInBatch = null; - + boolean reachedCutoff = false; + int validCount = 0; + for (Map row : rows) { - Long id = (Long) row.get("id"); + Long uploadAt = (Long) row.get("upload_at"); + if (uploadAt >= cutoffUploadAt) { + logger.info("[{}][{}] reached cutoff upload_at: {}, stop processing", DB_SCHEMA, table, uploadAt); + reachedCutoff = true; + break; + } - // 更新当前批次的最大ID和最大uploadAt + // 更新当前批次的最大ID + Long id = (Long) row.get("id"); if (maxIdInBatch == null || id > maxIdInBatch) { maxIdInBatch = id; } - Long uploadAt = (Long) row.get("upload_at"); - if (maxUploadAtInBatch == null || uploadAt > maxUploadAtInBatch) { - maxUploadAtInBatch = uploadAt; - } // 根据upload_at转换为东京时区日期 LocalDate date = Instant.ofEpochMilli(uploadAt).atZone(TOKYO_ZONE).toLocalDate(); @@ -129,16 +163,21 @@ public class MySQLBatchToS3Handler implements RequestHandler // 按日期分组 String dateKey = date.format(DATE_FORMATTER); grouped.computeIfAbsent(dateKey, k -> new ArrayList<>()).add(row); + validCount++; + } + + if (validCount == 0) { + logger.info("[{}][{}] no valid rows in this batch", DB_SCHEMA, table); + break; } // 如果当前批次没有有效数据,继续下一批 if (grouped.isEmpty()) { logger.info("[{}][{}] no valid data in this batch", DB_SCHEMA, table); - if (rows.size() < BATCH_SIZE) { + if (validCount < INSERT_BATCH_SIZE || reachedCutoff) { shouldContinue = false; } else { lastProcessedId = maxIdInBatch; - lastUploadAt = maxUploadAtInBatch; } continue; } @@ -161,7 +200,7 @@ public class MySQLBatchToS3Handler implements RequestHandler // 生成S3文件名,按序号递增 String prefix = String.format("%s/%s", table, dateKey); int nextSeq = getNextSequenceNumber(prefix); - String s3Key = String.format("%s/%s/%03d.csv", table, dateKey, nextSeq); + String s3Key = String.format("%s/%s/%08d.csv", table, dateKey, nextSeq); try { byte[] csvBytes = CsvUtil.toCsvBytes(dateRows); @@ -174,14 +213,16 @@ public class MySQLBatchToS3Handler implements RequestHandler logger.info("[{}][{}] delete {} records for date {}", DB_SCHEMA, table, currentDateIds.size(), dateKey); processedIds.addAll(currentDateIds); } catch (Exception e) { - logger.error("[{}][{}] delete failed for date {}, trying to rollback S3 file", DB_SCHEMA, table, dateKey, e); - // MySQL删除失败,回滚S3文件,保持一致性 - try { - s3Service.delete(s3Key); - logger.info("[{}][{}] rollback success, deleted S3 file: {}", DB_SCHEMA, table, s3Key); - } catch (Exception rollbackEx) { - logger.error("[{}][{}] rollback failed, S3 file may be orphaned: {}", DB_SCHEMA, table, s3Key, rollbackEx); + logger.error("[{}][{}] delete failed for date {}, recording to failed records", DB_SCHEMA, table, dateKey, e); + // 记录到失败记录中,不回滚 S3 文件 + FailedRecord record = failedRecords.tableRecords.get(table); + if (record == null) { + record = new FailedRecord(); + record.ids = new ArrayList<>(); + record.retryCount = 0; + failedRecords.tableRecords.put(table, record); } + record.ids.addAll(currentDateIds); anyFailure = true; } } catch (Exception e) { @@ -199,12 +240,11 @@ public class MySQLBatchToS3Handler implements RequestHandler Long maxProcessedId = processedIds.stream().max(Long::compareTo).orElse(null); if (maxProcessedId != null) { lastProcessedId = maxProcessedId; - lastUploadAt = maxUploadAtInBatch; } } - // 确定是否继续:只有全部成功且当前批次是完整批次时才继续 - if (anyFailure || rows.size() < BATCH_SIZE) { + // 确定是否继续:只有全部成功且当前批次是完整批次且没有达到cutoff时才继续 + if (anyFailure || validCount < INSERT_BATCH_SIZE || reachedCutoff) { shouldContinue = false; } } @@ -234,4 +274,82 @@ public class MySQLBatchToS3Handler implements RequestHandler return maxSeq + 1; } + + /** + * 加载失败记录文件 + */ + private FailedRecords loadFailedRecords() { + String s3Key = ".failed_records.json"; + try { + if (!s3Service.exists(s3Key)) { + logger.info("No failed records file found"); + return new FailedRecords(); + } + + byte[] content = s3Service.download(s3Key); + if (content == null || content.length == 0) { + logger.info("Failed records file is empty"); + return new FailedRecords(); + } + + return objectMapper.readValue(content, FailedRecords.class); + } catch (Exception e) { + logger.error("Failed to load failed records", e); + return new FailedRecords(); + } + } + + /** + * 保存失败记录文件 + */ + private void saveFailedRecords(FailedRecords failedRecords) { + String s3Key = ".failed_records.json"; + try { + byte[] content = objectMapper.writeValueAsBytes(failedRecords); + s3Service.upload(s3Key, content); + logger.info("Saved failed records, total tables: {}", failedRecords.tableRecords.size()); + } catch (Exception e) { + logger.error("Failed to save failed records", e); + } + } + + /** + * 处理失败记录:重试删除 + */ + private void processFailedRecords(FailedRecords failedRecords, String table) { + FailedRecord record = failedRecords.tableRecords.get(table); + if (record == null || record.ids == null || record.ids.isEmpty()) { + logger.info("[{}][{}] No failed records to process", DB_SCHEMA, table); + return; + } + + logger.info("[{}][{}] Processing {} failed records, retry count: {}", + DB_SCHEMA, table, record.ids.size(), record.retryCount); + + int maxRetries = 10; + if (record.retryCount >= maxRetries) { + logger.info("[{}][{}] Failed records reached max retries ({}), removing", + DB_SCHEMA, table, maxRetries); + failedRecords.tableRecords.remove(table); + return; + } + + // 尝试删除 + try { + mysqlService.deleteByIds(DB_SCHEMA, table, record.ids); + logger.info("[{}][{}] Success delete {} failed records, removing from list", + DB_SCHEMA, table, record.ids.size()); + failedRecords.tableRecords.remove(table); + } catch (Exception e) { + record.retryCount++; + logger.error("[{}][{}] Failed to delete failed records (retry {}/{}), keeping in list", + DB_SCHEMA, table, record.retryCount, maxRetries, e); + + if (record.retryCount >= maxRetries) { + logger.info("[{}][{}] Failed records reached max retries, removing from list", + DB_SCHEMA, table); + failedRecords.tableRecords.remove(table); + } + } + } } diff --git a/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java new file mode 100644 index 0000000..3dccb06 --- /dev/null +++ b/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java @@ -0,0 +1,318 @@ +package com.dashboard.aws.lambda.handler; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.dashboard.aws.lambda.Constants; +import com.dashboard.aws.lambda.service.MySQLService; +import com.dashboard.aws.lambda.service.S3Service; +import com.dashboard.aws.lambda.util.CsvUtil; +import com.dashboard.aws.lambda.util.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tools.jackson.databind.ObjectMapper; + +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.*; + +/** + * S3数据批量恢复到MySQL的Lambda处理器 + * + * 功能: + * 1. 从环境变量 SYNCHRONIZE_DATE 中解析要处理的日期(如果配置) + * 2. 如果没有配置 SYNCHRONIZE_DATE,则使用默认:去年同ISO周同星期几的日期 + * 3. 查询S3上指定日期下的CSV文件(格式:table/date/00000001.csv, 00000002.csv...) + * 4. 依次处理CSV文件,写入MySQL + * 5. 超时保护(剩余2分钟时停止) + * 6. 断点续传(记录已处理的CSV文件,下次跳过) + * + * S3文件结构: + * table/date/ + * ├─ 00000001.csv + * ├─ 00000002.csv + * └─ .processed ← JSON格式,记录已处理的CSV文件:{"processedFiles": ["00000001.csv", "00000002.csv"]} + */ +public class S3BatchToMySQLHandler implements RequestHandler, String> { + + private static final Logger logger = LoggerFactory.getLogger(S3BatchToMySQLHandler.class); + + // 从环境变量读取数据库schema + private static final String DB_SCHEMA = System.getenv("DB_SCHEMA"); + private static final String SYNCHRONIZE_DATE = System.getenv("SYNCHRONIZE_DATE"); + + // ObjectMapper用于JSON序列化/反序列化 + private static final ObjectMapper objectMapper = new ObjectMapper(); + + // MySQL和S3服务 + private final MySQLService mysqlService = new MySQLService(); + private final S3Service s3Service = new S3Service(); + + @Override + public String handleRequest(Map event, Context context) { + long startTime = System.currentTimeMillis(); + + // Lambda超时时间:15分钟,预留2分钟余量,13分钟时停止 + long lambdaTimeoutMs = 15 * 60 * 1000; + long timeoutThresholdMs = lambdaTimeoutMs - 2 * 60 * 1000; + + try { + // 获取东京时区 + ZoneId zone = Constants.TOKYO_ZONE; + + // 确定要处理的日期 + String dateStr; + if (SYNCHRONIZE_DATE != null && !SYNCHRONIZE_DATE.trim().isEmpty()) { + // 优先使用环境变量 SYNCHRONIZE_DATE + dateStr = SYNCHRONIZE_DATE.trim(); + logger.info("Using date from SYNCHRONIZE_DATE env: {}", dateStr); + } else { + // 没有配置则使用默认:去年同ISO周同星期几的日期 + LocalDate now = DateUtil.resolveEventDate(event, zone); + dateStr = DateUtil.getLastYearSameIsoWeekDayStr(now); + logger.info("Using date from DateUtil: {}", dateStr); + } + LocalDate fileDate = LocalDate.parse(dateStr); + + logger.info("Processing date: {}", dateStr); + + // 循环处理每个配置的表 + for (String table : Constants.tables) { + // 处理表前检查超时 + long elapsed = System.currentTimeMillis() - startTime; + if (elapsed >= timeoutThresholdMs) { + logger.warn("Approaching Lambda timeout, stopping at date: {}, table: {}", dateStr, table); + break; + } + + // 处理单个表 + processTable(table, fileDate, dateStr, startTime, timeoutThresholdMs); + } + + // 记录总耗时 + long totalTime = System.currentTimeMillis() - startTime; + logger.info("Process completed in {}ms", totalTime); + return "sync process finished"; + + } catch (Exception e) { + logger.error("sync error", e); + return "sync error:" + e.getMessage(); + } + } + + /** + * 处理单个表的数据恢复 + * + * @param table 表名 + * @param fileDate 截止日期(删除早于该日期的数据) + * @param dateStr 日期字符串(yyyy-MM-dd格式) + * @param overallStartTime Lambda处理开始时间戳 + * @param timeoutThresholdMs 超时阈值 + */ + private void processTable(String table, LocalDate fileDate, String dateStr, long overallStartTime, long timeoutThresholdMs) { + // 进度标记文件:table/date/.processed + String markerKey = String.format("%s/%s/.processed", table, dateStr); + logger.info("[{}][{}] start processing date: {}", DB_SCHEMA, table, dateStr); + + try { + // 1. 加载已处理的文件列表 + Map processedFiles = loadProcessedFiles(markerKey); + logger.info("[{}][{}] Already processed files: {}", DB_SCHEMA, table, processedFiles); + + // 2. 获取S3上指定日期的所有CSV文件 + String prefix = String.format("%s/%s", table, dateStr); + List keys = s3Service.listKeysWithPrefix(prefix); + if (keys.isEmpty()) { + logger.warn("[{}][{}] no files found for prefix: {}", DB_SCHEMA, table, prefix); + return; + } + + // 3. 按文件名排序(确保 00000001.csv, 00000002.csv 顺序处理) + Collections.sort(keys); + logger.info("[{}][{}] found {} files to process", DB_SCHEMA, table, keys.size()); + + // 4. 删除旧数据(只在第一次处理时执行) + if (processedFiles.isEmpty()) { + int deleted = mysqlService.deleteBeforeDate(DB_SCHEMA, table, fileDate); + logger.info("[{}][{}] deleted {} records before {}", DB_SCHEMA, table, deleted, fileDate); + } else { + logger.info("[{}][{}] already has progress for {} files, skipping deleteBeforeDate", DB_SCHEMA, table, processedFiles.size()); + } + + // 本次处理的文件进度(初始为已处理的) + Map progressThisTime = new HashMap<>(processedFiles); + boolean allProcessed = true; + + // 5. 依次处理每个CSV文件 + for (String s3Key : keys) { + // 处理文件前检查超时 + long elapsed = System.currentTimeMillis() - overallStartTime; + if (elapsed >= timeoutThresholdMs) { + logger.warn("[{}][{}] Approaching Lambda timeout, stopping at file: {}", DB_SCHEMA, table, s3Key); + allProcessed = false; + break; + } + + // 提取文件名(00000001.csv, 00000002.csv...) + String fileName = extractFileName(s3Key); + + // 获取已处理的行数(默认0) + int processedRows = progressThisTime.getOrDefault(fileName, 0); + + // 处理单个CSV文件(从已处理的行数开始) + int inserted = processSingleFile(table, s3Key, processedRows); + + // 更新进度:已处理行数 + 新插入行数 + int newProcessedRows = processedRows + inserted; + progressThisTime.put(fileName, newProcessedRows); + + // 保存处理进度 + saveProcessedFiles(markerKey, progressThisTime); + + logger.info("[{}][{}] file {} progress: {}/{} rows", DB_SCHEMA, table, fileName, newProcessedRows, newProcessedRows); + } + + // 6. 记录处理结果 + if (allProcessed) { + logger.info("[{}][{}] date processing completed", DB_SCHEMA, table); + } else { + logger.info("[{}][{}] date processing partially completed", DB_SCHEMA, table); + } + + } catch (Exception e) { + logger.error("[{}][{}] process error", DB_SCHEMA, table, e); + throw e; + } + } + + /** + * 从S3路径中提取文件名 + * 例如:table/date/00000001.csv -> 00000001.csv + * + * @param s3Key S3文件路径 + * @return 文件名 + */ + private String extractFileName(String s3Key) { + int lastSlash = s3Key.lastIndexOf('/'); + if (lastSlash >= 0) { + return s3Key.substring(lastSlash + 1); + } + return s3Key; + } + + /** + * 从S3加载已处理的文件列表 + * + * @param markerKey .processed 文件路径 + * @return 已处理的文件列表 + */ + private Map loadProcessedFiles(String markerKey) { + try { + // 先判断文件是否存在 + if (!s3Service.exists(markerKey)) { + logger.info("Processed state file not found: {}", markerKey); + return new HashMap<>(); + } + + // 下载 .processed 文件 + byte[] content = s3Service.download(markerKey); + if (content == null || content.length == 0) { + return new HashMap<>(); + } + + // 解析JSON格式 + ProcessedState state = objectMapper.readValue(content, ProcessedState.class); + return state.getFileProgress() != null ? state.getFileProgress() : new HashMap<>(); + } catch (Exception e) { + // 加载失败,返回空列表 + logger.info("Error loading processed state: {}", markerKey, e); + return new HashMap<>(); + } + } + + /** + * 保存已处理的文件进度到S3 + * + * @param markerKey .processed 文件路径 + * @param fileProgress 已处理的文件进度 Map<文件名, 已处理行数> + */ + private void saveProcessedFiles(String markerKey, Map fileProgress) { + try { + // 构造状态对象 + ProcessedState state = new ProcessedState(); + state.setFileProgress(fileProgress); + + // 序列化为JSON字节 + byte[] content = objectMapper.writeValueAsBytes(state); + + // 上传到S3 + s3Service.upload(markerKey, content); + logger.info("Saved processed files progress: {}", fileProgress.size()); + } catch (Exception e) { + logger.error("Failed to save processed files: {}", markerKey, e); + // 保存失败不影响主流程,只记录日志 + } + } + + /** + * 处理单个CSV文件:下载 -> 解析 -> 写入MySQL + * + * @param table 表名 + * @param s3Key S3文件路径 + * @param startRowIndex 从第几行开始处理(0-based) + * @return 实际插入的行数 + */ + private int processSingleFile(String table, String s3Key, int startRowIndex) { + logger.info("[{}][{}] processing s3 file: {}, starting from row: {}", DB_SCHEMA, table, s3Key, startRowIndex); + + try { + // 1. 下载CSV文件 + byte[] content = s3Service.download(s3Key); + if (content == null || content.length == 0) { + logger.warn("[{}][{}] file empty or not found: {}", DB_SCHEMA, table, s3Key); + return 0; + } + + // 2. 解析CSV为Map列表 + List> allRows = CsvUtil.parseCsv(content); + if (allRows.isEmpty()) { + logger.warn("[{}][{}] csv empty: {}", DB_SCHEMA, table, s3Key); + return 0; + } + + // 3. 截取需要处理的行 + List> rowsToProcess; + if (startRowIndex >= allRows.size()) { + logger.info("[{}][{}] file already fully processed: {}", DB_SCHEMA, table, s3Key); + return 0; + } + rowsToProcess = allRows.subList(startRowIndex, allRows.size()); + logger.info("[{}][{}] processing {} rows (total {} in file)", DB_SCHEMA, table, rowsToProcess.size(), allRows.size()); + + // 4. 批量插入到MySQL + int inserted = mysqlService.insertRows(DB_SCHEMA, table, rowsToProcess); + logger.info("[{}][{}] inserted {} rows from file: {}", DB_SCHEMA, table, inserted, s3Key); + + return inserted; + + } catch (Exception e) { + logger.error("[{}][{}] process failed for file: {}", DB_SCHEMA, table, s3Key, e); + throw new RuntimeException(e); + } + } + + /** + * 处理进度状态对象,用于JSON序列化 + */ + static class ProcessedState { + + private Map fileProgress; + + public Map getFileProgress() { + return fileProgress; + } + + public void setFileProgress(Map fileProgress) { + this.fileProgress = fileProgress; + } + } +} diff --git a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java index b4caf84..96269ca 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java @@ -15,6 +15,7 @@ public class MySQLService { private static final String MYSQL_URL = System.getenv("DB_URL"); // 统一实例URL private static final String DB_USER = System.getenv("DB_USER"); private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); + private static final String batchSizeEnv = System.getenv("BATCH_SIZE"); /** @@ -86,20 +87,113 @@ public class MySQLService { String columnStr = String.join(",", columns); String placeholders = String.join(",", Collections.nCopies(columns.size(), "?")); String sql = String.format("INSERT INTO %s.%s (%s) VALUES (%s)", schema, table, columnStr, placeholders); -// logger.info("insert sql: {}", sql); + + // 从环境变量读取批量大小,默认5000 + int batchSize = 5000; + if (batchSizeEnv != null && !batchSizeEnv.trim().isEmpty()) { + try { + batchSize = Integer.parseInt(batchSizeEnv.trim()); + } catch (NumberFormatException e) { + logger.warn("Invalid BATCH_SIZE: {}, using default 5000", batchSizeEnv); + } + } int total = 0; + int totalBatches = (rows.size() + batchSize - 1) / batchSize; + + // 优化:使用rewriteBatchedStatements提高性能 try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { - for (Map row : rows) { - for (int i = 0; i < columns.size(); i++) { - ps.setString(i + 1, row.getOrDefault(columns.get(i), null)); + // 关闭自动提交,手动控制事务 + conn.setAutoCommit(false); + + try { + // 分批处理 + for (int batchIndex = 0; batchIndex < totalBatches; batchIndex++) { + int start = batchIndex * batchSize; + int end = Math.min(start + batchSize, rows.size()); + List> batch = rows.subList(start, end); + int batchNumber = batchIndex + 1; + + // 尝试插入当前批次,最多重试2次 + boolean batchSuccess = false; + int retryCount = 0; + int maxRetries = 2; + int batchInserted = 0; + + while (!batchSuccess && retryCount <= maxRetries) { + try { + // 清空之前的批次 + ps.clearBatch(); + + // 添加当前批次的所有行 + for (Map row : batch) { + for (int i = 0; i < columns.size(); i++) { + ps.setString(i + 1, row.getOrDefault(columns.get(i), null)); + } + ps.addBatch(); + } + + // 执行批量插入 + int[] results = ps.executeBatch(); + batchInserted = 0; + for (int r : results) { + batchInserted += r; + } + + // 提交当前批次事务,每个批次独立事务,失败不影响其他批次 + conn.commit(); + total += batchInserted; + batchSuccess = true; + + if (retryCount == 0) { + logger.info("[{}][{}] Batch {}/{} inserted {} records, total: {}", + schema, table, batchNumber, totalBatches, batchInserted, total); + } else { + logger.info("[{}][{}] Batch {}/{} inserted {} records (retry {}), total: {}", + schema, table, batchNumber, totalBatches, batchInserted, retryCount, total); + } + + } catch (Exception e) { + // 回滚当前批次 + conn.rollback(); + retryCount++; + + if (retryCount > maxRetries) { + logger.error("[{}][{}] Batch {}/{} failed after {} retries, stopping at this batch", + schema, table, batchNumber, totalBatches, maxRetries, e); + // 不抛出异常,返回已成功插入的总行数 + return total; + } else { + logger.warn("[{}][{}] Batch {}/{} failed, retry {}/{}", + schema, table, batchNumber, totalBatches, retryCount, maxRetries, e); + } + // 回滚的话,停一下休眠20ms + try { + Thread.sleep(20); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); + } + } + } + + // 批次间隔:sleep 15ms,给实时写入留时间窗口,平滑IO压力 + if (batchIndex < totalBatches - 1) { + try { + Thread.sleep(15); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } - ps.addBatch(); + + logger.info("[{}][{}] All batches completed, total inserted: {}", schema, table, total); + + } finally { + // 恢复自动提交 + conn.setAutoCommit(true); } - int[] results = ps.executeBatch(); - for (int r : results) total += r; } return total; } @@ -137,52 +231,68 @@ public class MySQLService { return; } - // 构建IN子句的占位符字符串:?, ?, ?, ... - String placeholders = String.join(",", Collections.nCopies(ids.size(), "?")); - String sql = String.format("DELETE FROM %s.%s WHERE id IN (%s)", schema, table, placeholders); - logger.info("deleteByIds sql: {}, ids count: {}", sql, ids.size()); + int batchSize = 1000; + int totalDeleted = 0; - try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); - PreparedStatement ps = conn.prepareStatement(sql)) { + // 分批删除,每批1000个ID + for (int i = 0; i < ids.size(); i += batchSize) { + int end = Math.min(i + batchSize, ids.size()); + List batchIds = ids.subList(i, end); + + // 构建IN子句的占位符字符串:?, ?, ?, ... + String placeholders = String.join(",", Collections.nCopies(batchIds.size(), "?")); + String sql = String.format("DELETE FROM %s.%s WHERE id IN (%s)", schema, table, placeholders); +// logger.info("deleteByIds sql: {}, batch ids count: {}", sql, batchIds.size()); - // 逐个设置ID参数值 - for (int i = 0; i < ids.size(); i++) { - ps.setLong(i + 1, ids.get(i)); + try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); + PreparedStatement ps = conn.prepareStatement(sql)) { + + // 逐个设置ID参数值 + for (int j = 0; j < batchIds.size(); j++) { + ps.setLong(j + 1, batchIds.get(j)); + } + int deleted = ps.executeUpdate(); + totalDeleted += deleted; + logger.info("[{}.{}] 批次删除 {} 条记录,累计 {} 条", schema, table, deleted, totalDeleted); + } + + // 每批之间稍作停顿,减少锁冲突 + if (i + batchSize < ids.size()) { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - int deleted = ps.executeUpdate(); - logger.info("[{}.{}] 删除 {} 条记录", schema, table, deleted); } + + logger.info("[{}.{}] 总删除 {} 条记录", schema, table, totalDeleted); } /** - * 查询ID大于指定值的前N条数据(按 upload_at + id 升序) + * 查询ID大于指定值的前N条数据(按 id 升序) * 用于分页分批处理数据 * @param lastId 上次处理的最大ID(不包含),为null时从第一条开始查询 - * @param lastUploadAt 上次处理的最大 upload_at(不包含) - * @param cutoffUploadAt 截止时间,只处理 upload_at < cutoffUploadAt 的数据 */ - public List> queryFirstNByIdGreaterThan(String schema, String table, int limit, Long lastId, Long lastUploadAt, Long cutoffUploadAt) throws SQLException { + public List> queryFirstNByIdGreaterThan(String schema, String table, int limit, Long lastId) throws SQLException { String sql; if (lastId == null) { // 第一次查询 - sql = String.format("SELECT * FROM %s.%s WHERE upload_at < ? ORDER BY upload_at ASC, id ASC LIMIT ?", schema, table); + sql = String.format("SELECT * FROM %s.%s ORDER BY id ASC LIMIT ?", schema, table); } else { // 后续查询,从上次之后开始 - sql = String.format("SELECT * FROM %s.%s WHERE (upload_at > ? OR (upload_at = ? AND id > ?)) AND upload_at < ? ORDER BY upload_at ASC, id ASC LIMIT ?", schema, table); + sql = String.format("SELECT * FROM %s.%s WHERE id > ? ORDER BY id ASC LIMIT ?", schema, table); } - logger.info("queryFirstNByIdGreaterThan sql: {}, limit: {}, lastId: {}, lastUploadAt: {}, cutoffUploadAt: {}", - sql, limit, lastId, lastUploadAt, cutoffUploadAt); + logger.info("queryFirstNByIdGreaterThan sql: {}, limit: {}, lastId: {}", + sql, limit, lastId); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { int paramIndex = 1; if (lastId != null) { - ps.setLong(paramIndex++, lastUploadAt); - ps.setLong(paramIndex++, lastUploadAt); ps.setLong(paramIndex++, lastId); } - ps.setLong(paramIndex++, cutoffUploadAt); ps.setInt(paramIndex, limit); ResultSet rs = ps.executeQuery(); diff --git a/src/main/java/com/dashboard/aws/lambda/service/S3Service.java b/src/main/java/com/dashboard/aws/lambda/service/S3Service.java index f008d0b..b7c4b6e 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/S3Service.java +++ b/src/main/java/com/dashboard/aws/lambda/service/S3Service.java @@ -10,9 +10,11 @@ import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Object; @@ -89,6 +91,21 @@ public class S3Service { } } + public boolean exists(String key) { + try { + s3.headObject(HeadObjectRequest.builder() + .bucket(bucket) + .key(key) + .build()); + return true; + } catch (NoSuchKeyException e) { + return false; + } catch (Exception e) { + logger.error("check file exists failed: {}", key, e); + return false; + } + } + public byte[] download(String key) { try { ResponseBytes objectBytes = s3.getObjectAsBytes(GetObjectRequest.builder() @@ -97,6 +114,9 @@ public class S3Service { .build()); logger.info("download success: {}", key); return objectBytes.asByteArray(); + } catch (NoSuchKeyException e) { + logger.error("file not found: {}", key); + return null; } catch (Exception e) { logger.error("download failed: {}", key, e); return null; diff --git a/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java b/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java index 052d5be..88db2bd 100644 --- a/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java +++ b/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java @@ -79,7 +79,7 @@ public class S3CsvReorganizer { String basePath = matcher.group(1); // dashboard-for-backup/dashboard_record_measure/ String date = matcher.group(2); // 2025-12-25 - String newKey = basePath + date + "/001.csv"; + String newKey = basePath + date + "/00000001.csv"; // 如果目标文件和源文件相同则跳过 if (oldKey.equals(newKey)) {