You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

332 lines
13 KiB

package com.dashboard.aws.lambda.handler;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.dashboard.aws.lambda.Constants;
import com.dashboard.aws.lambda.service.MySQLService;
import com.dashboard.aws.lambda.service.S3Service;
import com.dashboard.aws.lambda.util.CsvUtil;
import com.dashboard.aws.lambda.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tools.jackson.databind.ObjectMapper;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.*;
/**
* S3数据批量恢复到MySQL的Lambda处理器
*
* 功能:
* 1. 从环境变量 SYNCHRONIZE_DATE 中解析要处理的日期(如果配置)
* 2. 如果没有配置 SYNCHRONIZE_DATE,则使用默认:去年同ISO周同星期几的日期
* 3. 查询S3上指定日期下的CSV文件(格式:table/date/00000001.csv, 00000002.csv...)
* 4. 依次处理CSV文件,写入MySQL
* 5. 超时保护(剩余2分钟时停止)
* 6. 断点续传(记录已处理的CSV文件,下次跳过)
*
* S3文件结构:
* table/date/
* ├─ 00000001.csv
* ├─ 00000002.csv
* └─ .sync_to_db_processed ← JSON格式,记录已处理的CSV文件:{"processedFiles": ["00000001.csv", "00000002.csv"]}
*/
public class S3BatchToMySQLHandler implements RequestHandler<Map<String, Object>, String> {
private static final Logger logger = LoggerFactory.getLogger(S3BatchToMySQLHandler.class);
// 从环境变量读取数据库schema
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA");
private static final String SYNCHRONIZE_DATE = System.getenv("SYNCHRONIZE_DATE");
// ObjectMapper用于JSON序列化/反序列化
private static final ObjectMapper objectMapper = new ObjectMapper();
// MySQL和S3服务
private final MySQLService mysqlService = new MySQLService();
private final S3Service s3Service = new S3Service();
@Override
public String handleRequest(Map<String, Object> event, Context context) {
long startTime = System.currentTimeMillis();
// Lambda超时时间:15分钟,预留5分钟余量,13分钟时停止
long lambdaTimeoutMs = 15 * 60 * 1000;
long timeoutThresholdMs = lambdaTimeoutMs - 5 * 60 * 1000;
try {
// 获取东京时区
ZoneId zone = Constants.TOKYO_ZONE;
// 确定要处理的日期
String dateStr;
if (SYNCHRONIZE_DATE != null && !SYNCHRONIZE_DATE.trim().isEmpty()) {
// 优先使用环境变量 SYNCHRONIZE_DATE
dateStr = SYNCHRONIZE_DATE.trim();
logger.info("Using date from SYNCHRONIZE_DATE env: {}", dateStr);
} else {
// 没有配置则使用默认:去年同ISO周同星期几的日期
LocalDate now = DateUtil.resolveEventDate(event, zone);
dateStr = DateUtil.getLastYearSameIsoWeekDayStr(now);
logger.info("Using date from DateUtil: {}", dateStr);
}
LocalDate fileDate = LocalDate.parse(dateStr);
logger.info("Processing date: {}", dateStr);
// 循环处理每个配置的表
for (String table : Constants.tables) {
// 处理表前检查超时
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed >= timeoutThresholdMs) {
logger.warn("Approaching Lambda timeout, stopping at date: {}, table: {}", dateStr, table);
break;
}
// 处理单个表
processTable(table, fileDate, dateStr, startTime, timeoutThresholdMs);
}
// 记录总耗时
long totalTime = System.currentTimeMillis() - startTime;
logger.info("Process completed in {}ms", totalTime);
return "sync process finished";
} catch (Exception e) {
logger.error("sync error", e);
return "sync error:" + e.getMessage();
}
}
/**
* 处理单个表的数据恢复
*
* @param table 表名
* @param fileDate 截止日期(删除早于该日期的数据)
* @param dateStr 日期字符串(yyyy-MM-dd格式)
* @param overallStartTime Lambda处理开始时间戳
* @param timeoutThresholdMs 超时阈值
*/
private void processTable(String table, LocalDate fileDate, String dateStr, long overallStartTime, long timeoutThresholdMs) {
// 进度标记文件:table/date/.sync_to_db_processed
String markerKey = String.format("%s/%s/.sync_to_db_processed", table, dateStr);
logger.info("[{}][{}] start processing date: {}", DB_SCHEMA, table, dateStr);
try {
// 1. 加载已处理的文件列表
Map<String, Integer> processedFiles = loadProcessedFiles(markerKey);
logger.info("[{}][{}] Already processed files: {}", DB_SCHEMA, table, processedFiles);
// 2. 获取S3上指定日期的所有CSV文件
String prefix = String.format("%s/%s", table, dateStr);
List<String> keys = s3Service.listKeysWithPrefix(prefix);
if (keys.isEmpty()) {
logger.warn("[{}][{}] no files found for prefix: {}", DB_SCHEMA, table, prefix);
return;
}
// 3. 过滤掉 marker 文件
List<String> csvKeys = new ArrayList<>();
for (String key : keys) {
String fileName = extractFileName(key);
if (!fileName.startsWith(".")) { // 跳过隐藏文件(如 .sync_to_db_processed)
csvKeys.add(key);
}
}
if (csvKeys.isEmpty()) {
logger.warn("[{}][{}] no CSV files found for prefix: {}", DB_SCHEMA, table, prefix);
return;
}
// 4. 按文件名排序(确保 00000001.csv, 00000002.csv 顺序处理)
Collections.sort(csvKeys);
logger.info("[{}][{}] found {} CSV files to process", DB_SCHEMA, table, csvKeys.size());
// 5. 删除旧数据(只在第一次处理时执行)
if (processedFiles.isEmpty()) {
int deleted = mysqlService.deleteBeforeDate(DB_SCHEMA, table, fileDate);
logger.info("[{}][{}] deleted {} records before {}", DB_SCHEMA, table, deleted, fileDate);
} else {
logger.info("[{}][{}] already has progress for {} files, skipping deleteBeforeDate", DB_SCHEMA, table, processedFiles.size());
}
// 本次处理的文件进度(初始为已处理的)
Map<String, Integer> progressThisTime = new HashMap<>(processedFiles);
boolean allProcessed = true;
// 6. 依次处理每个CSV文件
for (String s3Key : csvKeys) {
// 处理文件前检查超时
long elapsed = System.currentTimeMillis() - overallStartTime;
if (elapsed >= timeoutThresholdMs) {
logger.warn("[{}][{}] Approaching Lambda timeout, stopping at file: {}", DB_SCHEMA, table, s3Key);
allProcessed = false;
break;
}
// 提取文件名(00000001.csv, 00000002.csv...)
String fileName = extractFileName(s3Key);
// 获取已处理的行数(默认0)
int processedRows = progressThisTime.getOrDefault(fileName, 0);
// 处理单个CSV文件(从已处理的行数开始)
int inserted = processSingleFile(table, s3Key, processedRows);
// 更新进度:已处理行数 + 新插入行数
int newProcessedRows = processedRows + inserted;
progressThisTime.put(fileName, newProcessedRows);
// 保存处理进度
saveProcessedFiles(markerKey, progressThisTime);
logger.info("[{}][{}] file {} progress: {}/{} rows", DB_SCHEMA, table, fileName, newProcessedRows, newProcessedRows);
}
// 7. 记录处理结果
if (allProcessed) {
logger.info("[{}][{}] date processing completed", DB_SCHEMA, table);
} else {
logger.info("[{}][{}] date processing partially completed", DB_SCHEMA, table);
}
} catch (Exception e) {
logger.error("[{}][{}] process error", DB_SCHEMA, table, e);
throw e;
}
}
/**
* 从S3路径中提取文件名
* 例如:table/date/00000001.csv -> 00000001.csv
*
* @param s3Key S3文件路径
* @return 文件名
*/
private String extractFileName(String s3Key) {
int lastSlash = s3Key.lastIndexOf('/');
if (lastSlash >= 0) {
return s3Key.substring(lastSlash + 1);
}
return s3Key;
}
/**
* 从S3加载已处理的文件列表
*
* @param markerKey .sync_to_db_processed 文件路径
* @return 已处理的文件列表
*/
private Map<String, Integer> loadProcessedFiles(String markerKey) {
try {
// 先判断文件是否存在
if (!s3Service.exists(markerKey)) {
logger.info("Processed state file not found: {}", markerKey);
return new HashMap<>();
}
// 下载 .sync_to_db_processed 文件
byte[] content = s3Service.download(markerKey);
if (content == null || content.length == 0) {
return new HashMap<>();
}
// 解析JSON格式
ProcessedState state = objectMapper.readValue(content, ProcessedState.class);
return state.getFileProgress() != null ? state.getFileProgress() : new HashMap<>();
} catch (Exception e) {
// 加载失败,返回空列表
logger.info("Error loading processed state: {}", markerKey, e);
return new HashMap<>();
}
}
/**
* 保存已处理的文件进度到S3
*
* @param markerKey .sync_to_db_processed 文件路径
* @param fileProgress 已处理的文件进度 Map<文件名, 已处理行数>
*/
private void saveProcessedFiles(String markerKey, Map<String, Integer> fileProgress) {
try {
// 构造状态对象
ProcessedState state = new ProcessedState();
state.setFileProgress(fileProgress);
// 序列化为JSON字节
byte[] content = objectMapper.writeValueAsBytes(state);
// 上传到S3
s3Service.upload(markerKey, content);
logger.info("Saved processed files progress: {}", fileProgress.size());
} catch (Exception e) {
logger.error("Failed to save processed files: {}", markerKey, e);
// 保存失败不影响主流程,只记录日志
}
}
/**
* 处理单个CSV文件:下载 -> 解析 -> 写入MySQL
*
* @param table 表名
* @param s3Key S3文件路径
* @param startRowIndex 从第几行开始处理(0-based)
* @return 实际插入的行数
*/
private int processSingleFile(String table, String s3Key, int startRowIndex) {
logger.info("[{}][{}] processing s3 file: {}, starting from row: {}", DB_SCHEMA, table, s3Key, startRowIndex);
try {
// 1. 下载CSV文件
byte[] content = s3Service.download(s3Key);
if (content == null || content.length == 0) {
logger.warn("[{}][{}] file empty or not found: {}", DB_SCHEMA, table, s3Key);
return 0;
}
// 2. 解析CSV为Map列表
List<Map<String, String>> allRows = CsvUtil.parseCsv(content);
if (allRows.isEmpty()) {
logger.warn("[{}][{}] csv empty: {}", DB_SCHEMA, table, s3Key);
return 0;
}
// 3. 截取需要处理的行
List<Map<String, String>> rowsToProcess;
if (startRowIndex >= allRows.size()) {
logger.info("[{}][{}] file already fully processed: {}", DB_SCHEMA, table, s3Key);
return 0;
}
rowsToProcess = allRows.subList(startRowIndex, allRows.size());
logger.info("[{}][{}] processing {} rows (total {} in file)", DB_SCHEMA, table, rowsToProcess.size(), allRows.size());
// 4. 批量插入到MySQL
int inserted = mysqlService.insertRows(DB_SCHEMA, table, rowsToProcess);
logger.info("[{}][{}] inserted {} rows from file: {}", DB_SCHEMA, table, inserted, s3Key);
return inserted;
} catch (Exception e) {
logger.error("[{}][{}] process failed for file: {}", DB_SCHEMA, table, s3Key, e);
throw new RuntimeException(e);
}
}
/**
* 处理进度状态对象,用于JSON序列化
*/
static class ProcessedState {
private Map<String, Integer> fileProgress;
public Map<String, Integer> getFileProgress() {
return fileProgress;
}
public void setFileProgress(Map<String, Integer> fileProgress) {
this.fileProgress = fileProgress;
}
}
}