From 69544552622aaf07f55a29c463730eb6f15f624c Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Sun, 17 May 2026 18:12:07 +0800 Subject: [PATCH] =?UTF-8?q?mysql=E5=AD=98s3=E5=88=9D=E6=AD=A5=E5=AE=8C?= =?UTF-8?q?=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lambda/handler/MySQLBatchToS3Handler.java | 39 +++++++---- .../lambda/handler/S3BatchToMySQLHandler.java | 4 +- .../aws/lambda/service/MySQLService.java | 69 +++++++++++-------- 3 files changed, 68 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java b/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java index b092dfa..1e5f811 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java @@ -208,13 +208,21 @@ public class MySQLBatchToS3Handler implements RequestHandler logger.info("[{}][{}] upload success -> {}, {} rows", DB_SCHEMA, table, s3Key, dateRows.size()); // 上传成功,立即删除该分组对应的ID - try { - mysqlService.deleteByIds(DB_SCHEMA, table, currentDateIds); - logger.info("[{}][{}] delete {} records for date {}", DB_SCHEMA, table, currentDateIds.size(), dateKey); - processedIds.addAll(currentDateIds); - } catch (Exception e) { - logger.error("[{}][{}] delete failed for date {}, recording to failed records", DB_SCHEMA, table, dateKey, e); - // 记录到失败记录中,不回滚 S3 文件 + List failedIds = mysqlService.deleteByIds(DB_SCHEMA, table, currentDateIds); + + // 计算成功删除的 ID(总 ID - 失败 ID) + List successIds = new ArrayList<>(currentDateIds); + successIds.removeAll(failedIds); + + if (!successIds.isEmpty()) { + logger.info("[{}][{}] delete {} records for date {}", DB_SCHEMA, table, successIds.size(), dateKey); + processedIds.addAll(successIds); + } + + // 记录失败的 ID + if (!failedIds.isEmpty()) { + logger.error("[{}][{}] delete failed for {} records in date {}, recording to failed records", + DB_SCHEMA, table, failedIds.size(), dateKey); FailedRecord record = failedRecords.tableRecords.get(table); if (record == null) { record = new FailedRecord(); @@ -222,7 +230,7 @@ public class MySQLBatchToS3Handler implements RequestHandler record.retryCount = 0; failedRecords.tableRecords.put(table, record); } - record.ids.addAll(currentDateIds); + record.ids.addAll(failedIds); anyFailure = true; } } catch (Exception e) { @@ -335,15 +343,20 @@ public class MySQLBatchToS3Handler implements RequestHandler } // 尝试删除 - try { - mysqlService.deleteByIds(DB_SCHEMA, table, record.ids); + List stillFailedIds = mysqlService.deleteByIds(DB_SCHEMA, table, record.ids); + + if (stillFailedIds.isEmpty()) { + // 全部成功 logger.info("[{}][{}] Success delete {} failed records, removing from list", DB_SCHEMA, table, record.ids.size()); failedRecords.tableRecords.remove(table); - } catch (Exception e) { + } else { + // 部分失败 record.retryCount++; - logger.error("[{}][{}] Failed to delete failed records (retry {}/{}), keeping in list", - DB_SCHEMA, table, record.retryCount, maxRetries, e); + record.ids = stillFailedIds; + + logger.error("[{}][{}] Still {} records failed (retry {}/{})", + DB_SCHEMA, table, stillFailedIds.size(), record.retryCount, maxRetries); if (record.retryCount >= maxRetries) { logger.info("[{}][{}] Failed records reached max retries, removing from list", diff --git a/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java index 3dccb06..82ec08c 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java @@ -51,9 +51,9 @@ public class S3BatchToMySQLHandler implements RequestHandler public String handleRequest(Map event, Context context) { long startTime = System.currentTimeMillis(); - // Lambda超时时间:15分钟,预留2分钟余量,13分钟时停止 + // Lambda超时时间:15分钟,预留5分钟余量,13分钟时停止 long lambdaTimeoutMs = 15 * 60 * 1000; - long timeoutThresholdMs = lambdaTimeoutMs - 2 * 60 * 1000; + long timeoutThresholdMs = lambdaTimeoutMs - 5 * 60 * 1000; try { // 获取东京时区 diff --git a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java index 96269ca..6659ee8 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java @@ -225,48 +225,59 @@ public class MySQLService { /** * 根据ID列表批量删除数据 * 使用IN语句一次性删除,提高效率 + * @return 失败的ID列表(如果全部成功则返回空列表) */ - public void deleteByIds(String schema, String table, List ids) throws SQLException { + public List deleteByIds(String schema, String table, List ids) { if (ids == null || ids.isEmpty()) { - return; + return new ArrayList<>(); } int batchSize = 1000; int totalDeleted = 0; + List failedIds = new ArrayList<>(); - // 分批删除,每批1000个ID - for (int i = 0; i < ids.size(); i += batchSize) { - int end = Math.min(i + batchSize, ids.size()); - List batchIds = ids.subList(i, end); + try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD)) { + // 分批删除,每批1000个ID + for (int i = 0; i < ids.size(); i += batchSize) { + int end = Math.min(i + batchSize, ids.size()); + List batchIds = ids.subList(i, end); - // 构建IN子句的占位符字符串:?, ?, ?, ... - String placeholders = String.join(",", Collections.nCopies(batchIds.size(), "?")); - String sql = String.format("DELETE FROM %s.%s WHERE id IN (%s)", schema, table, placeholders); -// logger.info("deleteByIds sql: {}, batch ids count: {}", sql, batchIds.size()); - - try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); - PreparedStatement ps = conn.prepareStatement(sql)) { - - // 逐个设置ID参数值 - for (int j = 0; j < batchIds.size(); j++) { - ps.setLong(j + 1, batchIds.get(j)); - } - int deleted = ps.executeUpdate(); - totalDeleted += deleted; - logger.info("[{}.{}] 批次删除 {} 条记录,累计 {} 条", schema, table, deleted, totalDeleted); - } - - // 每批之间稍作停顿,减少锁冲突 - if (i + batchSize < ids.size()) { try { - Thread.sleep(20); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + // 构建IN子句的占位符字符串:?, ?, ?, ... + String placeholders = String.join(",", Collections.nCopies(batchIds.size(), "?")); + String sql = String.format("DELETE FROM %s.%s WHERE id IN (%s)", schema, table, placeholders); + logger.info("deleteByIds sql: {}, batch ids count: {}", sql, batchIds.size()); + + try (PreparedStatement ps = conn.prepareStatement(sql)) { + // 逐个设置ID参数值 + for (int j = 0; j < batchIds.size(); j++) { + ps.setLong(j + 1, batchIds.get(j)); + } + int deleted = ps.executeUpdate(); + totalDeleted += deleted; + logger.info("[{}.{}] 批次删除 {} 条记录,累计 {} 条", schema, table, deleted, totalDeleted); + } + + // 每批之间稍作停顿,减少锁冲突 + if (i + batchSize < ids.size()) { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } catch (Exception e) { + logger.error("[{}.{}] 批次删除失败,添加到失败列表:{}", schema, table, batchIds.size(), e); + failedIds.addAll(batchIds); } } + } catch (Exception e) { + logger.error("[{}.{}] 获取数据库连接失败,所有批次都失败", schema, table, e); + failedIds.addAll(ids); } - logger.info("[{}.{}] 总删除 {} 条记录", schema, table, totalDeleted); + logger.info("[{}.{}] 总删除 {} 条记录,失败 {} 条", schema, table, totalDeleted, failedIds.size()); + return failedIds; } /**