Browse Source

完成终版

master
review512jwy@163.com 1 week ago
parent
commit
e4013d2658
  1. 73
      src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java
  2. 84
      src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java
  3. 98
      src/main/java/com/dashboard/aws/lambda/service/MySQLService.java

73
src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java

@ -104,7 +104,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
// 先处理失败记录 // 先处理失败记录
processFailedRecords(failedRecords, table); processFailedRecords(failedRecords, table);
// 保存失败记录(处理完一个表就保存一次 // 保存失败记录(处理完一个表就保存一次
saveFailedRecords(failedRecords); saveFailedRecords(failedRecords);
// 再处理新数据 // 再处理新数据
@ -129,13 +129,33 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
* 循环查询批次数据直到没有数据或遇到截止日期或超时 * 循环查询批次数据直到没有数据或遇到截止日期或超时
*/ */
private void processTable(String table, long cutoffUploadAt, long overallStartTime, FailedRecords failedRecords) { private void processTable(String table, long cutoffUploadAt, long overallStartTime, FailedRecords failedRecords) {
logger.info("[{}][{}] start processing batch size: {}, cutoff upload_at: {}", DB_SCHEMA, table, DB_2_S3_BATCH_SIZE, cutoffUploadAt); // 计算开始时间:往前10天
long startTime = cutoffUploadAt - 10 * 24 * 60 * 60 * 1000L;
logger.info("[{}][{}] start processing batch size: {}, time range: [{}, {})",
DB_SCHEMA, table, DB_2_S3_BATCH_SIZE, startTime, cutoffUploadAt);
long tableStartTime = System.currentTimeMillis(); long tableStartTime = System.currentTimeMillis();
Long lastProcessedId = null; // 先查时间范围内的 minId 和 maxId
long minId, maxId;
try {
long[] minMaxId = mysqlService.queryMinMaxId(DB_SCHEMA, table, startTime, cutoffUploadAt);
minId = minMaxId[0];
maxId = minMaxId[1];
if (minId == 0 && maxId == 0) {
logger.info("[{}][{}] no data found, skip", DB_SCHEMA, table);
return;
}
} catch (Exception e) {
logger.error("[{}][{}] query min max id failed", DB_SCHEMA, table, e);
return;
}
// 用主键单推法处理
long currentId = minId;
boolean shouldContinue = true; boolean shouldContinue = true;
while (shouldContinue) { while (shouldContinue && currentId <= maxId) {
// 检查是否快超时:距离15分钟还剩小于2分钟时停止,留点余量时间,防止lambda超时 // 检查是否快超时:距离15分钟还剩小于2分钟时停止,留点余量时间,防止lambda超时
long elapsedTime = System.currentTimeMillis() - overallStartTime; long elapsedTime = System.currentTimeMillis() - overallStartTime;
long remainingTime = 15 * 60 * 1000 - elapsedTime; long remainingTime = 15 * 60 * 1000 - elapsedTime;
@ -148,7 +168,8 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
// 查询当前批次数据 // 查询当前批次数据
List<Map<String, Object>> rows = null; List<Map<String, Object>> rows = null;
try { try {
rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, DB_2_S3_BATCH_SIZE, lastProcessedId); // 这里有个小问题,由于id和upload_at不是同步递增,id小的upload_at反而大,那么会查出cutoffUploadAt后面的数据,但这个不影响,就当提前存了一部分
rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, currentId, maxId, DB_2_S3_BATCH_SIZE);
} catch (Exception e) { } catch (Exception e) {
logger.error("[{}][{}] query failed", DB_SCHEMA, table, e); logger.error("[{}][{}] query failed", DB_SCHEMA, table, e);
break; break;
@ -161,20 +182,11 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
logger.info("[{}][{}] fetched {} rows", DB_SCHEMA, table, rows.size()); logger.info("[{}][{}] fetched {} rows", DB_SCHEMA, table, rows.size());
// 过滤数据并按日期分组:只处理upload_at < cutoffUploadAt的记录 // 按日期分组
Map<String, List<Map<String, Object>>> grouped = new HashMap<>(); Map<String, List<Map<String, Object>>> grouped = new HashMap<>();
Long maxIdInBatch = null; Long maxIdInBatch = null;
boolean reachedCutoff = false;
int validCount = 0;
for (Map<String, Object> row : rows) { for (Map<String, Object> row : rows) {
Long uploadAt = (Long) row.get("upload_at");
if (uploadAt >= cutoffUploadAt) {
logger.info("[{}][{}] reached cutoff upload_at: {}, stop processing", DB_SCHEMA, table, uploadAt);
reachedCutoff = true;
break;
}
// 更新当前批次的最大ID // 更新当前批次的最大ID
Long id = (Long) row.get("id"); Long id = (Long) row.get("id");
if (maxIdInBatch == null || id > maxIdInBatch) { if (maxIdInBatch == null || id > maxIdInBatch) {
@ -182,26 +194,22 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
} }
// 根据upload_at转换为东京时区日期 // 根据upload_at转换为东京时区日期
Long uploadAt = (Long) row.get("upload_at");
LocalDate date = Instant.ofEpochMilli(uploadAt).atZone(TOKYO_ZONE).toLocalDate(); LocalDate date = Instant.ofEpochMilli(uploadAt).atZone(TOKYO_ZONE).toLocalDate();
// 按日期分组 // 按日期分组
String dateKey = date.format(DATE_FORMATTER); String dateKey = date.format(DATE_FORMATTER);
grouped.computeIfAbsent(dateKey, k -> new ArrayList<>()).add(row); grouped.computeIfAbsent(dateKey, k -> new ArrayList<>()).add(row);
validCount++;
}
if (validCount == 0) {
logger.info("[{}][{}] no valid rows in this batch", DB_SCHEMA, table);
break;
} }
// 如果当前批次没有有效数据,继续下一批 // 如果当前批次没有有效数据,继续下一批
if (grouped.isEmpty()) { if (grouped.isEmpty()) {
logger.info("[{}][{}] no valid data in this batch", DB_SCHEMA, table); logger.info("[{}][{}] no valid data in this batch", DB_SCHEMA, table);
if (validCount < DB_2_S3_BATCH_SIZE || reachedCutoff) { if (maxIdInBatch != null) {
currentId = maxIdInBatch + 1;
}
if (rows.size() < DB_2_S3_BATCH_SIZE) {
shouldContinue = false; shouldContinue = false;
} else {
lastProcessedId = maxIdInBatch;
} }
continue; continue;
} }
@ -234,7 +242,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
// 上传成功,立即删除该分组对应的ID // 上传成功,立即删除该分组对应的ID
List<Long> failedIds = mysqlService.deleteByIds(DB_SCHEMA, table, currentDateIds); List<Long> failedIds = mysqlService.deleteByIds(DB_SCHEMA, table, currentDateIds);
// 计算成功删除的 ID(总 ID - 失败 ID) // 计算成功删除的ID(总ID - 失败ID)
List<Long> successIds = new ArrayList<>(currentDateIds); List<Long> successIds = new ArrayList<>(currentDateIds);
successIds.removeAll(failedIds); successIds.removeAll(failedIds);
@ -243,7 +251,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
processedIds.addAll(successIds); processedIds.addAll(successIds);
} }
// 记录失败的 ID // 记录失败的ID
if (!failedIds.isEmpty()) { if (!failedIds.isEmpty()) {
logger.error("[{}][{}] delete failed for {} records in date {}, recording to failed records", logger.error("[{}][{}] delete failed for {} records in date {}, recording to failed records",
DB_SCHEMA, table, failedIds.size(), dateKey); DB_SCHEMA, table, failedIds.size(), dateKey);
@ -267,16 +275,13 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
logger.info("[{}][{}] date {} processed in {}ms", DB_SCHEMA, table, dateKey, groupTime); logger.info("[{}][{}] date {} processed in {}ms", DB_SCHEMA, table, dateKey, groupTime);
} }
// 更新下次查询的起始ID:只要处理了数据,就更新到已处理的最大ID // 更新下次查询的起始ID
if (!processedIds.isEmpty()) { if (maxIdInBatch != null) {
Long maxProcessedId = processedIds.stream().max(Long::compareTo).orElse(null); currentId = maxIdInBatch + 1;
if (maxProcessedId != null) {
lastProcessedId = maxProcessedId;
}
} }
// 确定是否继续:只有全部成功且当前批次是完整批次且没有达到cutoff时才继续 // 确定是否继续:只有全部成功且当前批次是完整批次时才继续
if (anyFailure || validCount < DB_2_S3_BATCH_SIZE || reachedCutoff) { if (anyFailure || rows.size() < DB_2_S3_BATCH_SIZE) {
shouldContinue = false; shouldContinue = false;
} }
} }

84
src/main/java/com/dashboard/aws/lambda/handler/S3ToMySQLHandler.java

@ -1,84 +0,0 @@
package com.dashboard.aws.lambda.handler;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.dashboard.aws.lambda.Constants;
import com.dashboard.aws.lambda.service.MySQLService;
import com.dashboard.aws.lambda.service.S3Service;
import com.dashboard.aws.lambda.util.CsvUtil;
import com.dashboard.aws.lambda.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.*;
public class S3ToMySQLHandler implements RequestHandler<Map<String, Object>, String> {
private static final Logger logger = LoggerFactory.getLogger(S3ToMySQLHandler.class);
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA");
private final MySQLService mysqlService = new MySQLService();
private final S3Service s3Service = new S3Service();
@Override
public String handleRequest(Map<String, Object> event, Context context) {
try {
// 同步目标日期(这里同步 3 天前的数据)
// LocalDate targetDate = LocalDate.now(ZoneId.of("Asia/Tokyo")).minusDays(3);
// String dateStr = targetDate.format(java.time.format.DateTimeFormatter.ISO_DATE);
// String dateStr = "2025-10-08";
ZoneId zone = ZoneId.of("Asia/Tokyo");
LocalDate now = DateUtil.resolveEventDate( event, zone);
String dateStr = DateUtil.getLastYearSameIsoWeekDayStr(now);
// --- 解析出 fileDate(用于删除早于该日期的数据) ---
LocalDate fileDate = LocalDate.parse(dateStr);
List<String> tables = Constants.tables;
String schema = DB_SCHEMA;
for (String table : tables) {
//先删除旧数据
int deleted = mysqlService.deleteBeforeDate(schema, table, fileDate);
logger.info("[{}][{}] deleted {} records before {}", schema, table, deleted, fileDate);
// 构造 S3 文件路径
String s3Key = String.format("%s/%s.csv", table, dateStr);
logger.info("processing s3 file: {}", s3Key);
// 下载 CSV
byte[] content = s3Service.download(s3Key);
if (content == null || content.length == 0) {
logger.warn("[{}][{}] file empty or not found: {}", schema, table, s3Key);
continue;
}
// 解析 CSV
List<Map<String, String>> rows = CsvUtil.parseCsv(content);
if (rows.isEmpty()) {
logger.warn("[{}][{}] csv empty: {}", schema, table, s3Key);
continue;
}
// 插入到 MySQL
try {
int inserted = mysqlService.insertRows(schema, table, rows);
logger.info("[{}][{}] inserted {} rows", schema, table, inserted);
} catch (Exception e) {
logger.error("[{}][{}] insert failed -> {}", schema, table, s3Key, e);
}
}
return "sync process finished";
} catch (Exception e) {
logger.error("sync error", e);
return "sync error:" + e.getMessage();
}
}
}

98
src/main/java/com/dashboard/aws/lambda/service/MySQLService.java

@ -205,20 +205,47 @@ public class MySQLService {
.toEpochMilli(); .toEpochMilli();
String sql = String.format( String sql = String.format(
"DELETE FROM %s.%s WHERE upload_at < ?", "DELETE FROM %s.%s WHERE upload_at < ? LIMIT ?",
schema, table schema, table
); );
logger.info("deleteBeforeDate sql: {}, cutoffUploadAt:{}", sql, cutoffUploadAt); logger.info("deleteBeforeDate sql: {}, cutoffUploadAt:{}", sql, cutoffUploadAt);
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); int batchSize = 5000; // 每批删除 5000 条
PreparedStatement ps = conn.prepareStatement(sql)) { int totalDeleted = 0;
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD)) {
while (true) {
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setLong(1, cutoffUploadAt); ps.setLong(1, cutoffUploadAt);
ps.setInt(2, batchSize);
int deleted = ps.executeUpdate(); int deleted = ps.executeUpdate();
logger.info("deleteBeforeDate: schema={}, table={}, cutoffUploadAt={}, deleted={}", schema, table, cutoffUploadAt, deleted);
return deleted; if (deleted == 0) {
logger.info("deleteBeforeDate: no more records to delete");
break; // 没有更多数据了
}
totalDeleted += deleted;
logger.info("deleteBeforeDate: schema={}, table={}, cutoffUploadAt={}, batch deleted={}, total deleted={}",
schema, table, cutoffUploadAt, deleted, totalDeleted);
}
// 每批之间 sleep 50ms,减少锁冲突
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
logger.info("deleteBeforeDate: schema={}, table={}, cutoffUploadAt={}, total deleted={}",
schema, table, cutoffUploadAt, totalDeleted);
return totalDeleted;
} catch (SQLException e) { } catch (SQLException e) {
logger.error("deleteBeforeDate failed for {}/{} cutoffUploadAt={}", schema, table, cutoffUploadAt, e); logger.error("deleteBeforeDate failed for {}/{} cutoffUploadAt={}", schema, table, cutoffUploadAt, e);
return 0; return totalDeleted; // 出错时返回已删除的数量
} }
} }
@ -281,30 +308,53 @@ public class MySQLService {
} }
/** /**
* 查询ID大于指定值的前N条数据 id 升序 * 查询时间范围内的最小ID和最大ID
* 用于分页分批处理数据 * @param startTime 开始时间包含
* @param lastId 上次处理的最大ID不包含为null时从第一条开始查询 * @param endTime 结束时间不包含
* @return [minId, maxId]如果没有数据返回 [0, 0]
*/ */
public List<Map<String, Object>> queryFirstNByIdGreaterThan(String schema, String table, int limit, Long lastId) throws SQLException { public long[] queryMinMaxId(String schema, String table, long startTime, long endTime) throws SQLException {
String sql; String sql = String.format("SELECT MIN(id) AS min_id, MAX(id) AS max_id FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table);
if (lastId == null) { logger.info("queryMinMaxId sql: {}, startTime: {}, endTime: {}", sql, startTime, endTime);
// 第一次查询
sql = String.format("SELECT * FROM %s.%s ORDER BY id ASC LIMIT ?", schema, table);
} else {
// 后续查询,从上次之后开始
sql = String.format("SELECT * FROM %s.%s WHERE id > ? ORDER BY id ASC LIMIT ?", schema, table);
}
logger.info("queryFirstNByIdGreaterThan sql: {}, limit: {}, lastId: {}",
sql, limit, lastId);
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) { PreparedStatement ps = conn.prepareStatement(sql)) {
int paramIndex = 1; ps.setLong(1, startTime);
if (lastId != null) { ps.setLong(2, endTime);
ps.setLong(paramIndex++, lastId);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
long minId = rs.getLong("min_id");
long maxId = rs.getLong("max_id");
if (rs.wasNull()) {
logger.info("[{}.{}] no data found in time range", schema, table);
return new long[]{0, 0};
}
logger.info("[{}.{}] minId: {}, maxId: {}", schema, table, minId, maxId);
return new long[]{minId, maxId};
}
return new long[]{0, 0};
}
} }
ps.setInt(paramIndex, limit);
/**
* 查询ID范围内的前N条数据 id ASC
* 用于分页分批处理数据
* @param startId 起始ID包含
* @param endId 结束ID包含
* @param limit 每批条数
*/
public List<Map<String, Object>> queryFirstNByIdGreaterThan(String schema, String table, long startId, long endId, int limit) throws SQLException {
String sql = String.format("SELECT * FROM %s.%s WHERE id >= ? AND id <= ? ORDER BY id ASC LIMIT ?", schema, table);
logger.info("queryFirstNByIdGreaterThan sql: {}, startId: {}, endId: {}, limit: {}", sql, startId, endId, limit);
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setLong(1, startId);
ps.setLong(2, endId);
ps.setInt(3, limit);
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
ResultSetMetaData meta = rs.getMetaData(); ResultSetMetaData meta = rs.getMetaData();

Loading…
Cancel
Save