Browse Source

mysql存s3初步完成

master
review512jwy@163.com 2 weeks ago
parent
commit
fa8b643209
  1. 7
      pom.xml
  2. 4
      src/main/java/com/dashboard/aws/lambda/Constants.java
  3. 176
      src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java
  4. 318
      src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java
  5. 148
      src/main/java/com/dashboard/aws/lambda/service/MySQLService.java
  6. 22
      src/main/java/com/dashboard/aws/lambda/service/S3Service.java
  7. 2
      src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java

7
pom.xml

@ -76,6 +76,13 @@
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>tools.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>3.1.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

4
src/main/java/com/dashboard/aws/lambda/Constants.java

@ -7,7 +7,7 @@ public class Constants {
public static final ZoneId TOKYO_ZONE = ZoneId.of("Asia/Tokyo");
// public static final List<String> tables = List.of("dashboard_record_accumulate", "dashboard_record_measure");
public static final List<String> tables = List.of("dashboard_record_accumulate_test", "dashboard_record_measure_test");
public static final List<String> tables = List.of("dashboard_record_accumulate", "dashboard_record_measure");
// public static final List<String> tables = List.of("dashboard_record_accumulate_test", "dashboard_record_measure_test");
}

176
src/main/java/com/dashboard/aws/lambda/handler/MySQLBatchToS3Handler.java

@ -20,26 +20,44 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import tools.jackson.databind.ObjectMapper;
public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>, String> {
private static final Logger logger = LoggerFactory.getLogger(MySQLBatchToS3Handler.class);
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA");
private static final int BATCH_SIZE = Integer.parseInt(System.getenv("BATCH_SIZE"));
private static final int INSERT_BATCH_SIZE = Integer.parseInt(System.getenv("INSERT_BATCH_SIZE"));
private static final String CUTOFF_DATE_STR = System.getenv("CUTOFF_DATE");
private static final ZoneId TOKYO_ZONE = Constants.TOKYO_ZONE;
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final Pattern FILE_PATTERN = Pattern.compile(".*/(\\d{3})\\.csv$");
private static final Pattern FILE_PATTERN = Pattern.compile(".*/(\\d{8})\\.csv$");
private final MySQLService mysqlService = new MySQLService();
private final S3Service s3Service = new S3Service();
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 失败记录数据结构
*/
static class FailedRecord {
public List<Long> ids;
public int retryCount;
}
/**
* 失败记录集合
*/
static class FailedRecords {
public Map<String, FailedRecord> tableRecords = new HashMap<>();
}
@Override
public String handleRequest(Map<String, Object> event, Context context) {
long startTime = System.currentTimeMillis();
try {
// 确定截止日期:优先使用环境变量,否则默认使用东京时区3天前
// 确定截止日期:优先使用环境变量,否则默认使用东京时区2天前
LocalDate cutoffDate;
if (CUTOFF_DATE_STR != null && !CUTOFF_DATE_STR.isEmpty()) {
cutoffDate = LocalDate.parse(CUTOFF_DATE_STR);
@ -53,9 +71,23 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
long cutoffUploadAt = cutoffDate.atStartOfDay(TOKYO_ZONE).toInstant().toEpochMilli();
logger.info("Cutoff upload_at: {}", cutoffUploadAt);
// 加载失败记录
FailedRecords failedRecords = loadFailedRecords();
logger.info("Loaded failed records for {} tables", failedRecords.tableRecords.size());
// 处理每个配置的表
for (String table : Constants.tables) {
processTable(table, cutoffUploadAt, startTime);
// 先处理失败记录
processFailedRecords(failedRecords, table);
// 保存失败记录(处理完一个表就保存一次
saveFailedRecords(failedRecords);
// 再处理新数据
processTable(table, cutoffUploadAt, startTime, failedRecords);
// 保存失败记录
saveFailedRecords(failedRecords);
}
long totalTime = System.currentTimeMillis() - startTime;
@ -72,12 +104,11 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
* 处理单个表的数据备份
* 循环查询批次数据直到没有数据或遇到截止日期或超时
*/
private void processTable(String table, long cutoffUploadAt, long overallStartTime) {
logger.info("[{}][{}] start processing batch size: {}", DB_SCHEMA, table, BATCH_SIZE);
private void processTable(String table, long cutoffUploadAt, long overallStartTime, FailedRecords failedRecords) {
logger.info("[{}][{}] start processing batch size: {}, cutoff upload_at: {}", DB_SCHEMA, table, INSERT_BATCH_SIZE, cutoffUploadAt);
long tableStartTime = System.currentTimeMillis();
Long lastProcessedId = null;
Long lastUploadAt = null;
boolean shouldContinue = true;
while (shouldContinue) {
@ -93,7 +124,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
// 查询当前批次数据
List<Map<String, Object>> rows = null;
try {
rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, BATCH_SIZE, lastProcessedId, lastUploadAt, cutoffUploadAt);
rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, INSERT_BATCH_SIZE, lastProcessedId);
} catch (Exception e) {
logger.error("[{}][{}] query failed", DB_SCHEMA, table, e);
break;
@ -106,22 +137,25 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
logger.info("[{}][{}] fetched {} rows", DB_SCHEMA, table, rows.size());
// 按日期分组数据
// 过滤数据并按日期分组:只处理upload_at < cutoffUploadAt的记录
Map<String, List<Map<String, Object>>> grouped = new HashMap<>();
Long maxIdInBatch = null;
Long maxUploadAtInBatch = null;
boolean reachedCutoff = false;
int validCount = 0;
for (Map<String, Object> row : rows) {
Long id = (Long) row.get("id");
Long uploadAt = (Long) row.get("upload_at");
if (uploadAt >= cutoffUploadAt) {
logger.info("[{}][{}] reached cutoff upload_at: {}, stop processing", DB_SCHEMA, table, uploadAt);
reachedCutoff = true;
break;
}
// 更新当前批次的最大ID和最大uploadAt
// 更新当前批次的最大ID
Long id = (Long) row.get("id");
if (maxIdInBatch == null || id > maxIdInBatch) {
maxIdInBatch = id;
}
Long uploadAt = (Long) row.get("upload_at");
if (maxUploadAtInBatch == null || uploadAt > maxUploadAtInBatch) {
maxUploadAtInBatch = uploadAt;
}
// 根据upload_at转换为东京时区日期
LocalDate date = Instant.ofEpochMilli(uploadAt).atZone(TOKYO_ZONE).toLocalDate();
@ -129,16 +163,21 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
// 按日期分组
String dateKey = date.format(DATE_FORMATTER);
grouped.computeIfAbsent(dateKey, k -> new ArrayList<>()).add(row);
validCount++;
}
if (validCount == 0) {
logger.info("[{}][{}] no valid rows in this batch", DB_SCHEMA, table);
break;
}
// 如果当前批次没有有效数据,继续下一批
if (grouped.isEmpty()) {
logger.info("[{}][{}] no valid data in this batch", DB_SCHEMA, table);
if (rows.size() < BATCH_SIZE) {
if (validCount < INSERT_BATCH_SIZE || reachedCutoff) {
shouldContinue = false;
} else {
lastProcessedId = maxIdInBatch;
lastUploadAt = maxUploadAtInBatch;
}
continue;
}
@ -161,7 +200,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
// 生成S3文件名,按序号递增
String prefix = String.format("%s/%s", table, dateKey);
int nextSeq = getNextSequenceNumber(prefix);
String s3Key = String.format("%s/%s/%03d.csv", table, dateKey, nextSeq);
String s3Key = String.format("%s/%s/%08d.csv", table, dateKey, nextSeq);
try {
byte[] csvBytes = CsvUtil.toCsvBytes(dateRows);
@ -174,14 +213,16 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
logger.info("[{}][{}] delete {} records for date {}", DB_SCHEMA, table, currentDateIds.size(), dateKey);
processedIds.addAll(currentDateIds);
} catch (Exception e) {
logger.error("[{}][{}] delete failed for date {}, trying to rollback S3 file", DB_SCHEMA, table, dateKey, e);
// MySQL删除失败,回滚S3文件,保持一致性
try {
s3Service.delete(s3Key);
logger.info("[{}][{}] rollback success, deleted S3 file: {}", DB_SCHEMA, table, s3Key);
} catch (Exception rollbackEx) {
logger.error("[{}][{}] rollback failed, S3 file may be orphaned: {}", DB_SCHEMA, table, s3Key, rollbackEx);
logger.error("[{}][{}] delete failed for date {}, recording to failed records", DB_SCHEMA, table, dateKey, e);
// 记录到失败记录中,不回滚 S3 文件
FailedRecord record = failedRecords.tableRecords.get(table);
if (record == null) {
record = new FailedRecord();
record.ids = new ArrayList<>();
record.retryCount = 0;
failedRecords.tableRecords.put(table, record);
}
record.ids.addAll(currentDateIds);
anyFailure = true;
}
} catch (Exception e) {
@ -199,12 +240,11 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
Long maxProcessedId = processedIds.stream().max(Long::compareTo).orElse(null);
if (maxProcessedId != null) {
lastProcessedId = maxProcessedId;
lastUploadAt = maxUploadAtInBatch;
}
}
// 确定是否继续:只有全部成功且当前批次是完整批次时才继续
if (anyFailure || rows.size() < BATCH_SIZE) {
// 确定是否继续:只有全部成功且当前批次是完整批次且没有达到cutoff时才继续
if (anyFailure || validCount < INSERT_BATCH_SIZE || reachedCutoff) {
shouldContinue = false;
}
}
@ -234,4 +274,82 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object>
return maxSeq + 1;
}
/**
* 加载失败记录文件
*/
private FailedRecords loadFailedRecords() {
String s3Key = ".failed_records.json";
try {
if (!s3Service.exists(s3Key)) {
logger.info("No failed records file found");
return new FailedRecords();
}
byte[] content = s3Service.download(s3Key);
if (content == null || content.length == 0) {
logger.info("Failed records file is empty");
return new FailedRecords();
}
return objectMapper.readValue(content, FailedRecords.class);
} catch (Exception e) {
logger.error("Failed to load failed records", e);
return new FailedRecords();
}
}
/**
* 保存失败记录文件
*/
private void saveFailedRecords(FailedRecords failedRecords) {
String s3Key = ".failed_records.json";
try {
byte[] content = objectMapper.writeValueAsBytes(failedRecords);
s3Service.upload(s3Key, content);
logger.info("Saved failed records, total tables: {}", failedRecords.tableRecords.size());
} catch (Exception e) {
logger.error("Failed to save failed records", e);
}
}
/**
* 处理失败记录重试删除
*/
private void processFailedRecords(FailedRecords failedRecords, String table) {
FailedRecord record = failedRecords.tableRecords.get(table);
if (record == null || record.ids == null || record.ids.isEmpty()) {
logger.info("[{}][{}] No failed records to process", DB_SCHEMA, table);
return;
}
logger.info("[{}][{}] Processing {} failed records, retry count: {}",
DB_SCHEMA, table, record.ids.size(), record.retryCount);
int maxRetries = 10;
if (record.retryCount >= maxRetries) {
logger.info("[{}][{}] Failed records reached max retries ({}), removing",
DB_SCHEMA, table, maxRetries);
failedRecords.tableRecords.remove(table);
return;
}
// 尝试删除
try {
mysqlService.deleteByIds(DB_SCHEMA, table, record.ids);
logger.info("[{}][{}] Success delete {} failed records, removing from list",
DB_SCHEMA, table, record.ids.size());
failedRecords.tableRecords.remove(table);
} catch (Exception e) {
record.retryCount++;
logger.error("[{}][{}] Failed to delete failed records (retry {}/{}), keeping in list",
DB_SCHEMA, table, record.retryCount, maxRetries, e);
if (record.retryCount >= maxRetries) {
logger.info("[{}][{}] Failed records reached max retries, removing from list",
DB_SCHEMA, table);
failedRecords.tableRecords.remove(table);
}
}
}
}

318
src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java

@ -0,0 +1,318 @@
package com.dashboard.aws.lambda.handler;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.dashboard.aws.lambda.Constants;
import com.dashboard.aws.lambda.service.MySQLService;
import com.dashboard.aws.lambda.service.S3Service;
import com.dashboard.aws.lambda.util.CsvUtil;
import com.dashboard.aws.lambda.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tools.jackson.databind.ObjectMapper;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.*;
/**
* S3数据批量恢复到MySQL的Lambda处理器
*
* 功能
* 1. 从环境变量 SYNCHRONIZE_DATE 中解析要处理的日期如果配置
* 2. 如果没有配置 SYNCHRONIZE_DATE则使用默认去年同ISO周同星期几的日期
* 3. 查询S3上指定日期下的CSV文件格式table/date/00000001.csv, 00000002.csv...
* 4. 依次处理CSV文件写入MySQL
* 5. 超时保护剩余2分钟时停止
* 6. 断点续传记录已处理的CSV文件下次跳过
*
* S3文件结构
* table/date/
* 00000001.csv
* 00000002.csv
* .processed JSON格式记录已处理的CSV文件{"processedFiles": ["00000001.csv", "00000002.csv"]}
*/
public class S3BatchToMySQLHandler implements RequestHandler<Map<String, Object>, String> {
private static final Logger logger = LoggerFactory.getLogger(S3BatchToMySQLHandler.class);
// 从环境变量读取数据库schema
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA");
private static final String SYNCHRONIZE_DATE = System.getenv("SYNCHRONIZE_DATE");
// ObjectMapper用于JSON序列化/反序列化
private static final ObjectMapper objectMapper = new ObjectMapper();
// MySQL和S3服务
private final MySQLService mysqlService = new MySQLService();
private final S3Service s3Service = new S3Service();
@Override
public String handleRequest(Map<String, Object> event, Context context) {
long startTime = System.currentTimeMillis();
// Lambda超时时间:15分钟,预留2分钟余量,13分钟时停止
long lambdaTimeoutMs = 15 * 60 * 1000;
long timeoutThresholdMs = lambdaTimeoutMs - 2 * 60 * 1000;
try {
// 获取东京时区
ZoneId zone = Constants.TOKYO_ZONE;
// 确定要处理的日期
String dateStr;
if (SYNCHRONIZE_DATE != null && !SYNCHRONIZE_DATE.trim().isEmpty()) {
// 优先使用环境变量 SYNCHRONIZE_DATE
dateStr = SYNCHRONIZE_DATE.trim();
logger.info("Using date from SYNCHRONIZE_DATE env: {}", dateStr);
} else {
// 没有配置则使用默认:去年同ISO周同星期几的日期
LocalDate now = DateUtil.resolveEventDate(event, zone);
dateStr = DateUtil.getLastYearSameIsoWeekDayStr(now);
logger.info("Using date from DateUtil: {}", dateStr);
}
LocalDate fileDate = LocalDate.parse(dateStr);
logger.info("Processing date: {}", dateStr);
// 循环处理每个配置的表
for (String table : Constants.tables) {
// 处理表前检查超时
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed >= timeoutThresholdMs) {
logger.warn("Approaching Lambda timeout, stopping at date: {}, table: {}", dateStr, table);
break;
}
// 处理单个表
processTable(table, fileDate, dateStr, startTime, timeoutThresholdMs);
}
// 记录总耗时
long totalTime = System.currentTimeMillis() - startTime;
logger.info("Process completed in {}ms", totalTime);
return "sync process finished";
} catch (Exception e) {
logger.error("sync error", e);
return "sync error:" + e.getMessage();
}
}
/**
* 处理单个表的数据恢复
*
* @param table 表名
* @param fileDate 截止日期删除早于该日期的数据
* @param dateStr 日期字符串yyyy-MM-dd格式
* @param overallStartTime Lambda处理开始时间戳
* @param timeoutThresholdMs 超时阈值
*/
private void processTable(String table, LocalDate fileDate, String dateStr, long overallStartTime, long timeoutThresholdMs) {
// 进度标记文件:table/date/.processed
String markerKey = String.format("%s/%s/.processed", table, dateStr);
logger.info("[{}][{}] start processing date: {}", DB_SCHEMA, table, dateStr);
try {
// 1. 加载已处理的文件列表
Map<String, Integer> processedFiles = loadProcessedFiles(markerKey);
logger.info("[{}][{}] Already processed files: {}", DB_SCHEMA, table, processedFiles);
// 2. 获取S3上指定日期的所有CSV文件
String prefix = String.format("%s/%s", table, dateStr);
List<String> keys = s3Service.listKeysWithPrefix(prefix);
if (keys.isEmpty()) {
logger.warn("[{}][{}] no files found for prefix: {}", DB_SCHEMA, table, prefix);
return;
}
// 3. 按文件名排序(确保 00000001.csv, 00000002.csv 顺序处理)
Collections.sort(keys);
logger.info("[{}][{}] found {} files to process", DB_SCHEMA, table, keys.size());
// 4. 删除旧数据(只在第一次处理时执行)
if (processedFiles.isEmpty()) {
int deleted = mysqlService.deleteBeforeDate(DB_SCHEMA, table, fileDate);
logger.info("[{}][{}] deleted {} records before {}", DB_SCHEMA, table, deleted, fileDate);
} else {
logger.info("[{}][{}] already has progress for {} files, skipping deleteBeforeDate", DB_SCHEMA, table, processedFiles.size());
}
// 本次处理的文件进度(初始为已处理的)
Map<String, Integer> progressThisTime = new HashMap<>(processedFiles);
boolean allProcessed = true;
// 5. 依次处理每个CSV文件
for (String s3Key : keys) {
// 处理文件前检查超时
long elapsed = System.currentTimeMillis() - overallStartTime;
if (elapsed >= timeoutThresholdMs) {
logger.warn("[{}][{}] Approaching Lambda timeout, stopping at file: {}", DB_SCHEMA, table, s3Key);
allProcessed = false;
break;
}
// 提取文件名(00000001.csv, 00000002.csv...)
String fileName = extractFileName(s3Key);
// 获取已处理的行数(默认0)
int processedRows = progressThisTime.getOrDefault(fileName, 0);
// 处理单个CSV文件(从已处理的行数开始)
int inserted = processSingleFile(table, s3Key, processedRows);
// 更新进度:已处理行数 + 新插入行数
int newProcessedRows = processedRows + inserted;
progressThisTime.put(fileName, newProcessedRows);
// 保存处理进度
saveProcessedFiles(markerKey, progressThisTime);
logger.info("[{}][{}] file {} progress: {}/{} rows", DB_SCHEMA, table, fileName, newProcessedRows, newProcessedRows);
}
// 6. 记录处理结果
if (allProcessed) {
logger.info("[{}][{}] date processing completed", DB_SCHEMA, table);
} else {
logger.info("[{}][{}] date processing partially completed", DB_SCHEMA, table);
}
} catch (Exception e) {
logger.error("[{}][{}] process error", DB_SCHEMA, table, e);
throw e;
}
}
/**
* 从S3路径中提取文件名
* 例如table/date/00000001.csv -> 00000001.csv
*
* @param s3Key S3文件路径
* @return 文件名
*/
private String extractFileName(String s3Key) {
int lastSlash = s3Key.lastIndexOf('/');
if (lastSlash >= 0) {
return s3Key.substring(lastSlash + 1);
}
return s3Key;
}
/**
* 从S3加载已处理的文件列表
*
* @param markerKey .processed 文件路径
* @return 已处理的文件列表
*/
private Map<String, Integer> loadProcessedFiles(String markerKey) {
try {
// 先判断文件是否存在
if (!s3Service.exists(markerKey)) {
logger.info("Processed state file not found: {}", markerKey);
return new HashMap<>();
}
// 下载 .processed 文件
byte[] content = s3Service.download(markerKey);
if (content == null || content.length == 0) {
return new HashMap<>();
}
// 解析JSON格式
ProcessedState state = objectMapper.readValue(content, ProcessedState.class);
return state.getFileProgress() != null ? state.getFileProgress() : new HashMap<>();
} catch (Exception e) {
// 加载失败,返回空列表
logger.info("Error loading processed state: {}", markerKey, e);
return new HashMap<>();
}
}
/**
* 保存已处理的文件进度到S3
*
* @param markerKey .processed 文件路径
* @param fileProgress 已处理的文件进度 Map<文件名, 已处理行数>
*/
private void saveProcessedFiles(String markerKey, Map<String, Integer> fileProgress) {
try {
// 构造状态对象
ProcessedState state = new ProcessedState();
state.setFileProgress(fileProgress);
// 序列化为JSON字节
byte[] content = objectMapper.writeValueAsBytes(state);
// 上传到S3
s3Service.upload(markerKey, content);
logger.info("Saved processed files progress: {}", fileProgress.size());
} catch (Exception e) {
logger.error("Failed to save processed files: {}", markerKey, e);
// 保存失败不影响主流程,只记录日志
}
}
/**
* 处理单个CSV文件下载 -> 解析 -> 写入MySQL
*
* @param table 表名
* @param s3Key S3文件路径
* @param startRowIndex 从第几行开始处理0-based
* @return 实际插入的行数
*/
private int processSingleFile(String table, String s3Key, int startRowIndex) {
logger.info("[{}][{}] processing s3 file: {}, starting from row: {}", DB_SCHEMA, table, s3Key, startRowIndex);
try {
// 1. 下载CSV文件
byte[] content = s3Service.download(s3Key);
if (content == null || content.length == 0) {
logger.warn("[{}][{}] file empty or not found: {}", DB_SCHEMA, table, s3Key);
return 0;
}
// 2. 解析CSV为Map列表
List<Map<String, String>> allRows = CsvUtil.parseCsv(content);
if (allRows.isEmpty()) {
logger.warn("[{}][{}] csv empty: {}", DB_SCHEMA, table, s3Key);
return 0;
}
// 3. 截取需要处理的行
List<Map<String, String>> rowsToProcess;
if (startRowIndex >= allRows.size()) {
logger.info("[{}][{}] file already fully processed: {}", DB_SCHEMA, table, s3Key);
return 0;
}
rowsToProcess = allRows.subList(startRowIndex, allRows.size());
logger.info("[{}][{}] processing {} rows (total {} in file)", DB_SCHEMA, table, rowsToProcess.size(), allRows.size());
// 4. 批量插入到MySQL
int inserted = mysqlService.insertRows(DB_SCHEMA, table, rowsToProcess);
logger.info("[{}][{}] inserted {} rows from file: {}", DB_SCHEMA, table, inserted, s3Key);
return inserted;
} catch (Exception e) {
logger.error("[{}][{}] process failed for file: {}", DB_SCHEMA, table, s3Key, e);
throw new RuntimeException(e);
}
}
/**
* 处理进度状态对象用于JSON序列化
*/
static class ProcessedState {
private Map<String, Integer> fileProgress;
public Map<String, Integer> getFileProgress() {
return fileProgress;
}
public void setFileProgress(Map<String, Integer> fileProgress) {
this.fileProgress = fileProgress;
}
}
}

148
src/main/java/com/dashboard/aws/lambda/service/MySQLService.java

@ -15,6 +15,7 @@ public class MySQLService {
private static final String MYSQL_URL = System.getenv("DB_URL"); // 统一实例URL
private static final String DB_USER = System.getenv("DB_USER");
private static final String DB_PASSWORD = System.getenv("DB_PASSWORD");
private static final String batchSizeEnv = System.getenv("BATCH_SIZE");
/**
@ -86,20 +87,113 @@ public class MySQLService {
String columnStr = String.join(",", columns);
String placeholders = String.join(",", Collections.nCopies(columns.size(), "?"));
String sql = String.format("INSERT INTO %s.%s (%s) VALUES (%s)", schema, table, columnStr, placeholders);
// logger.info("insert sql: {}", sql);
// 从环境变量读取批量大小,默认5000
int batchSize = 5000;
if (batchSizeEnv != null && !batchSizeEnv.trim().isEmpty()) {
try {
batchSize = Integer.parseInt(batchSizeEnv.trim());
} catch (NumberFormatException e) {
logger.warn("Invalid BATCH_SIZE: {}, using default 5000", batchSizeEnv);
}
}
int total = 0;
int totalBatches = (rows.size() + batchSize - 1) / batchSize;
// 优化:使用rewriteBatchedStatements提高性能
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
for (Map<String, String> row : rows) {
// 关闭自动提交,手动控制事务
conn.setAutoCommit(false);
try {
// 分批处理
for (int batchIndex = 0; batchIndex < totalBatches; batchIndex++) {
int start = batchIndex * batchSize;
int end = Math.min(start + batchSize, rows.size());
List<Map<String, String>> batch = rows.subList(start, end);
int batchNumber = batchIndex + 1;
// 尝试插入当前批次,最多重试2次
boolean batchSuccess = false;
int retryCount = 0;
int maxRetries = 2;
int batchInserted = 0;
while (!batchSuccess && retryCount <= maxRetries) {
try {
// 清空之前的批次
ps.clearBatch();
// 添加当前批次的所有行
for (Map<String, String> row : batch) {
for (int i = 0; i < columns.size(); i++) {
ps.setString(i + 1, row.getOrDefault(columns.get(i), null));
}
ps.addBatch();
}
// 执行批量插入
int[] results = ps.executeBatch();
for (int r : results) total += r;
batchInserted = 0;
for (int r : results) {
batchInserted += r;
}
// 提交当前批次事务,每个批次独立事务,失败不影响其他批次
conn.commit();
total += batchInserted;
batchSuccess = true;
if (retryCount == 0) {
logger.info("[{}][{}] Batch {}/{} inserted {} records, total: {}",
schema, table, batchNumber, totalBatches, batchInserted, total);
} else {
logger.info("[{}][{}] Batch {}/{} inserted {} records (retry {}), total: {}",
schema, table, batchNumber, totalBatches, batchInserted, retryCount, total);
}
} catch (Exception e) {
// 回滚当前批次
conn.rollback();
retryCount++;
if (retryCount > maxRetries) {
logger.error("[{}][{}] Batch {}/{} failed after {} retries, stopping at this batch",
schema, table, batchNumber, totalBatches, maxRetries, e);
// 不抛出异常,返回已成功插入的总行数
return total;
} else {
logger.warn("[{}][{}] Batch {}/{} failed, retry {}/{}",
schema, table, batchNumber, totalBatches, retryCount, maxRetries, e);
}
// 回滚的话,停一下休眠20ms
try {
Thread.sleep(20);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
}
}
}
// 批次间隔:sleep 15ms,给实时写入留时间窗口,平滑IO压力
if (batchIndex < totalBatches - 1) {
try {
Thread.sleep(15);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
logger.info("[{}][{}] All batches completed, total inserted: {}", schema, table, total);
} finally {
// 恢复自动提交
conn.setAutoCommit(true);
}
}
return total;
}
@ -137,52 +231,68 @@ public class MySQLService {
return;
}
int batchSize = 1000;
int totalDeleted = 0;
// 分批删除,每批1000个ID
for (int i = 0; i < ids.size(); i += batchSize) {
int end = Math.min(i + batchSize, ids.size());
List<Long> batchIds = ids.subList(i, end);
// 构建IN子句的占位符字符串:?, ?, ?, ...
String placeholders = String.join(",", Collections.nCopies(ids.size(), "?"));
String placeholders = String.join(",", Collections.nCopies(batchIds.size(), "?"));
String sql = String.format("DELETE FROM %s.%s WHERE id IN (%s)", schema, table, placeholders);
logger.info("deleteByIds sql: {}, ids count: {}", sql, ids.size());
// logger.info("deleteByIds sql: {}, batch ids count: {}", sql, batchIds.size());
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
// 逐个设置ID参数值
for (int i = 0; i < ids.size(); i++) {
ps.setLong(i + 1, ids.get(i));
for (int j = 0; j < batchIds.size(); j++) {
ps.setLong(j + 1, batchIds.get(j));
}
int deleted = ps.executeUpdate();
logger.info("[{}.{}] 删除 {} 条记录", schema, table, deleted);
totalDeleted += deleted;
logger.info("[{}.{}] 批次删除 {} 条记录,累计 {} 条", schema, table, deleted, totalDeleted);
}
// 每批之间稍作停顿,减少锁冲突
if (i + batchSize < ids.size()) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
logger.info("[{}.{}] 总删除 {} 条记录", schema, table, totalDeleted);
}
/**
* 查询ID大于指定值的前N条数据 upload_at + id 升序
* 查询ID大于指定值的前N条数据 id 升序
* 用于分页分批处理数据
* @param lastId 上次处理的最大ID不包含为null时从第一条开始查询
* @param lastUploadAt 上次处理的最大 upload_at不包含
* @param cutoffUploadAt 截止时间只处理 upload_at < cutoffUploadAt 的数据
*/
public List<Map<String, Object>> queryFirstNByIdGreaterThan(String schema, String table, int limit, Long lastId, Long lastUploadAt, Long cutoffUploadAt) throws SQLException {
public List<Map<String, Object>> queryFirstNByIdGreaterThan(String schema, String table, int limit, Long lastId) throws SQLException {
String sql;
if (lastId == null) {
// 第一次查询
sql = String.format("SELECT * FROM %s.%s WHERE upload_at < ? ORDER BY upload_at ASC, id ASC LIMIT ?", schema, table);
sql = String.format("SELECT * FROM %s.%s ORDER BY id ASC LIMIT ?", schema, table);
} else {
// 后续查询,从上次之后开始
sql = String.format("SELECT * FROM %s.%s WHERE (upload_at > ? OR (upload_at = ? AND id > ?)) AND upload_at < ? ORDER BY upload_at ASC, id ASC LIMIT ?", schema, table);
sql = String.format("SELECT * FROM %s.%s WHERE id > ? ORDER BY id ASC LIMIT ?", schema, table);
}
logger.info("queryFirstNByIdGreaterThan sql: {}, limit: {}, lastId: {}, lastUploadAt: {}, cutoffUploadAt: {}",
sql, limit, lastId, lastUploadAt, cutoffUploadAt);
logger.info("queryFirstNByIdGreaterThan sql: {}, limit: {}, lastId: {}",
sql, limit, lastId);
try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD);
PreparedStatement ps = conn.prepareStatement(sql)) {
int paramIndex = 1;
if (lastId != null) {
ps.setLong(paramIndex++, lastUploadAt);
ps.setLong(paramIndex++, lastUploadAt);
ps.setLong(paramIndex++, lastId);
}
ps.setLong(paramIndex++, cutoffUploadAt);
ps.setInt(paramIndex, limit);
ResultSet rs = ps.executeQuery();

22
src/main/java/com/dashboard/aws/lambda/service/S3Service.java

@ -10,9 +10,11 @@ import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Object;
@ -89,6 +91,21 @@ public class S3Service {
}
}
public boolean exists(String key) {
try {
s3.headObject(HeadObjectRequest.builder()
.bucket(bucket)
.key(key)
.build());
return true;
} catch (NoSuchKeyException e) {
return false;
} catch (Exception e) {
logger.error("check file exists failed: {}", key, e);
return false;
}
}
public byte[] download(String key) {
try {
ResponseBytes<?> objectBytes = s3.getObjectAsBytes(GetObjectRequest.builder()
@ -97,6 +114,9 @@ public class S3Service {
.build());
logger.info("download success: {}", key);
return objectBytes.asByteArray();
} catch (NoSuchKeyException e) {
logger.error("file not found: {}", key);
return null;
} catch (Exception e) {
logger.error("download failed: {}", key, e);
return null;

2
src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java

@ -79,7 +79,7 @@ public class S3CsvReorganizer {
String basePath = matcher.group(1); // dashboard-for-backup/dashboard_record_measure/
String date = matcher.group(2); // 2025-12-25
String newKey = basePath + date + "/001.csv";
String newKey = basePath + date + "/00000001.csv";
// 如果目标文件和源文件相同则跳过
if (oldKey.equals(newKey)) {

Loading…
Cancel
Save