|
|
@ -30,7 +30,7 @@ import java.util.*; |
|
|
* table/date/ |
|
|
* table/date/ |
|
|
* ├─ 00000001.csv |
|
|
* ├─ 00000001.csv |
|
|
* ├─ 00000002.csv |
|
|
* ├─ 00000002.csv |
|
|
* └─ .processed ← JSON格式,记录已处理的CSV文件:{"processedFiles": ["00000001.csv", "00000002.csv"]} |
|
|
* └─ .sync_to_db_processed ← JSON格式,记录已处理的CSV文件:{"processedFiles": ["00000001.csv", "00000002.csv"]} |
|
|
*/ |
|
|
*/ |
|
|
public class S3BatchToMySQLHandler implements RequestHandler<Map<String, Object>, String> { |
|
|
public class S3BatchToMySQLHandler implements RequestHandler<Map<String, Object>, String> { |
|
|
|
|
|
|
|
|
@ -109,8 +109,8 @@ public class S3BatchToMySQLHandler implements RequestHandler<Map<String, Object> |
|
|
* @param timeoutThresholdMs 超时阈值 |
|
|
* @param timeoutThresholdMs 超时阈值 |
|
|
*/ |
|
|
*/ |
|
|
private void processTable(String table, LocalDate fileDate, String dateStr, long overallStartTime, long timeoutThresholdMs) { |
|
|
private void processTable(String table, LocalDate fileDate, String dateStr, long overallStartTime, long timeoutThresholdMs) { |
|
|
// 进度标记文件:table/date/.processed
|
|
|
// 进度标记文件:table/date/.sync_to_db_processed
|
|
|
String markerKey = String.format("%s/%s/.processed", table, dateStr); |
|
|
String markerKey = String.format("%s/%s/.sync_to_db_processed", table, dateStr); |
|
|
logger.info("[{}][{}] start processing date: {}", DB_SCHEMA, table, dateStr); |
|
|
logger.info("[{}][{}] start processing date: {}", DB_SCHEMA, table, dateStr); |
|
|
|
|
|
|
|
|
try { |
|
|
try { |
|
|
@ -202,7 +202,7 @@ public class S3BatchToMySQLHandler implements RequestHandler<Map<String, Object> |
|
|
/** |
|
|
/** |
|
|
* 从S3加载已处理的文件列表 |
|
|
* 从S3加载已处理的文件列表 |
|
|
* |
|
|
* |
|
|
* @param markerKey .processed 文件路径 |
|
|
* @param markerKey .sync_to_db_processed 文件路径 |
|
|
* @return 已处理的文件列表 |
|
|
* @return 已处理的文件列表 |
|
|
*/ |
|
|
*/ |
|
|
private Map<String, Integer> loadProcessedFiles(String markerKey) { |
|
|
private Map<String, Integer> loadProcessedFiles(String markerKey) { |
|
|
@ -213,7 +213,7 @@ public class S3BatchToMySQLHandler implements RequestHandler<Map<String, Object> |
|
|
return new HashMap<>(); |
|
|
return new HashMap<>(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 下载 .processed 文件
|
|
|
// 下载 .sync_to_db_processed 文件
|
|
|
byte[] content = s3Service.download(markerKey); |
|
|
byte[] content = s3Service.download(markerKey); |
|
|
if (content == null || content.length == 0) { |
|
|
if (content == null || content.length == 0) { |
|
|
return new HashMap<>(); |
|
|
return new HashMap<>(); |
|
|
@ -232,7 +232,7 @@ public class S3BatchToMySQLHandler implements RequestHandler<Map<String, Object> |
|
|
/** |
|
|
/** |
|
|
* 保存已处理的文件进度到S3 |
|
|
* 保存已处理的文件进度到S3 |
|
|
* |
|
|
* |
|
|
* @param markerKey .processed 文件路径 |
|
|
* @param markerKey .sync_to_db_processed 文件路径 |
|
|
* @param fileProgress 已处理的文件进度 Map<文件名, 已处理行数> |
|
|
* @param fileProgress 已处理的文件进度 Map<文件名, 已处理行数> |
|
|
*/ |
|
|
*/ |
|
|
private void saveProcessedFiles(String markerKey, Map<String, Integer> fileProgress) { |
|
|
private void saveProcessedFiles(String markerKey, Map<String, Integer> fileProgress) { |
|
|
|