Browse Source

mysql存s3初步完成

master
review512jwy@163.com 2 weeks ago
parent
commit
6954455262
  1. 39
      src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java
  2. 4
      src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java
  3. 69
      src/main/java/com/dashboard/aws/lambda/service/MySQLService.java

39
src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java

@ -208,13 +208,21 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
logger.info("[{}][{}] upload success -> {}, {} rows", DB_SCHEMA, table, s3Key, dateRows.size());
// 上传成功,立即删除该分组对应的ID
try {
mysqlService.deleteByIds(DB_SCHEMA, table, currentDateIds);
logger.info("[{}][{}] delete {} records for date {}", DB_SCHEMA, table, currentDateIds.size(), dateKey);
processedIds.addAll(currentDateIds);
} catch (Exception e) {
logger.error("[{}][{}] delete failed for date {}, recording to failed records", DB_SCHEMA, table, dateKey, e);
// 记录到失败记录中,不回滚 S3 文件
List<Long> failedIds = mysqlService.deleteByIds(DB_SCHEMA, table, currentDateIds);
// 计算成功删除的 ID(总 ID - 失败 ID)
List<Long> successIds = new ArrayList<>(currentDateIds);
successIds.removeAll(failedIds);
if (!successIds.isEmpty()) {
logger.info("[{}][{}] delete {} records for date {}", DB_SCHEMA, table, successIds.size(), dateKey);
processedIds.addAll(successIds);
}
// 记录失败的 ID
if (!failedIds.isEmpty()) {
logger.error("[{}][{}] delete failed for {} records in date {}, recording to failed records",
DB_SCHEMA, table, failedIds.size(), dateKey);
FailedRecord record = failedRecords.tableRecords.get(table);
if (record == null) {
record = new FailedRecord();
@ -222,7 +230,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
record.retryCount = 0;
failedRecords.tableRecords.put(table, record);
}
record.ids.addAll(currentDateIds);
record.ids.addAll(failedIds);
anyFailure = true;
}
} catch (Exception e) {
@ -335,15 +343,20 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
}
// 尝试删除
try {
mysqlService.deleteByIds(DB_SCHEMA, table, record.ids);
List<Long> stillFailedIds = mysqlService.deleteByIds(DB_SCHEMA, table, record.ids);
if (stillFailedIds.isEmpty()) {
// 全部成功
logger.info("[{}][{}] Success delete {} failed records, removing from list",
DB_SCHEMA, table, record.ids.size());
failedRecords.tableRecords.remove(table);
} catch (Exception e) {
} else {
// 部分失败
record.retryCount++;
logger.error("[{}][{}] Failed to delete failed records (retry {}/{}), keeping in list",
DB_SCHEMA, table, record.retryCount, maxRetries, e);
record.ids = stillFailedIds;
logger.error("[{}][{}] Still {} records failed (retry {}/{})",
DB_SCHEMA, table, stillFailedIds.size(), record.retryCount, maxRetries);
if (record.retryCount >= maxRetries) {
logger.info("[{}][{}] Failed records reached max retries, removing from list",

4
src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java

@ -51,9 +51,9 @@ public class S3BatchToMySQLHandler implements RequestHandler<Map<String, Object>
public String handleRequest(Map<String, Object> event, Context context) {
long startTime = System.currentTimeMillis();
// Lambda超时时间:15分钟,预留2分钟余量,13分钟时停止
// Lambda超时时间:15分钟,预留5分钟余量,13分钟时停止
long lambdaTimeoutMs = 15 * 60 * 1000;
long timeoutThresholdMs = lambdaTimeoutMs - 2 * 60 * 1000;
long timeoutThresholdMs = lambdaTimeoutMs - 5 * 60 * 1000;
try {
// 获取东京时区

69
src/main/java/com/dashboard/aws/lambda/service/MySQLService.java

@ -225,48 +225,59 @@ public class MySQLService {
/**
* 根据ID列表批量删除数据
* 使用IN语句一次性删除提高效率
* @return 失败的ID列表如果全部成功则返回空列表
*/
public void deleteByIds(String schema, String table, List<Long> ids) throws SQLException {
public List<Long> deleteByIds(String schema, String table, List<Long> ids) {
if (ids == null || ids.isEmpty()) {
return;
return new ArrayList<>();
}
int batchSize = 1000;
int totalDeleted = 0;
List<Long> failedIds = new ArrayList<>();
// 分批删除,每批1000个ID
for (int i = 0; i < ids.size(); i += batchSize) {
int end = Math.min(i + batchSize, ids.size());
List<Long> batchIds = ids.subList(i, end);
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD)) {
// 分批删除,每批1000个ID
for (int i = 0; i < ids.size(); i += batchSize) {
int end = Math.min(i + batchSize, ids.size());
List<Long> batchIds = ids.subList(i, end);
// 构建IN子句的占位符字符串:?, ?, ?, ...
String placeholders = String.join(",", Collections.nCopies(batchIds.size(), "?"));
String sql = String.format("DELETE FROM %s.%s WHERE id IN (%s)", schema, table, placeholders);
// logger.info("deleteByIds sql: {}, batch ids count: {}", sql, batchIds.size());
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
// 逐个设置ID参数值
for (int j = 0; j < batchIds.size(); j++) {
ps.setLong(j + 1, batchIds.get(j));
}
int deleted = ps.executeUpdate();
totalDeleted += deleted;
logger.info("[{}.{}] 批次删除 {} 条记录,累计 {} 条", schema, table, deleted, totalDeleted);
}
// 每批之间稍作停顿,减少锁冲突
if (i + batchSize < ids.size()) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 构建IN子句的占位符字符串:?, ?, ?, ...
String placeholders = String.join(",", Collections.nCopies(batchIds.size(), "?"));
String sql = String.format("DELETE FROM %s.%s WHERE id IN (%s)", schema, table, placeholders);
logger.info("deleteByIds sql: {}, batch ids count: {}", sql, batchIds.size());
try (PreparedStatement ps = conn.prepareStatement(sql)) {
// 逐个设置ID参数值
for (int j = 0; j < batchIds.size(); j++) {
ps.setLong(j + 1, batchIds.get(j));
}
int deleted = ps.executeUpdate();
totalDeleted += deleted;
logger.info("[{}.{}] 批次删除 {} 条记录,累计 {} 条", schema, table, deleted, totalDeleted);
}
// 每批之间稍作停顿,减少锁冲突
if (i + batchSize < ids.size()) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
logger.error("[{}.{}] 批次删除失败,添加到失败列表:{}", schema, table, batchIds.size(), e);
failedIds.addAll(batchIds);
}
}
} catch (Exception e) {
logger.error("[{}.{}] 获取数据库连接失败,所有批次都失败", schema, table, e);
failedIds.addAll(ids);
}
logger.info("[{}.{}] 总删除 {} 条记录", schema, table, totalDeleted);
logger.info("[{}.{}] 总删除 {} 条记录,失败 {} 条", schema, table, totalDeleted, failedIds.size());
return failedIds;
}
/**

Loading…
Cancel
Save