diff --git a/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java b/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java index 2cd38e2..883dbb8 100644 --- a/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java +++ b/src/main/java/com/dashboard/aws/lambda/handler/S3BatchToMySQLHandler.java @@ -126,11 +126,25 @@ public class S3BatchToMySQLHandler implements RequestHandler return; } - // 3. 按文件名排序(确保 00000001.csv, 00000002.csv 顺序处理) - Collections.sort(keys); - logger.info("[{}][{}] found {} files to process", DB_SCHEMA, table, keys.size()); + // 3. 过滤掉 marker 文件 + List csvKeys = new ArrayList<>(); + for (String key : keys) { + String fileName = extractFileName(key); + if (!fileName.startsWith(".")) { // 跳过隐藏文件(如 .sync_to_db_processed) + csvKeys.add(key); + } + } + + if (csvKeys.isEmpty()) { + logger.warn("[{}][{}] no CSV files found for prefix: {}", DB_SCHEMA, table, prefix); + return; + } + + // 4. 按文件名排序(确保 00000001.csv, 00000002.csv 顺序处理) + Collections.sort(csvKeys); + logger.info("[{}][{}] found {} CSV files to process", DB_SCHEMA, table, csvKeys.size()); - // 4. 删除旧数据(只在第一次处理时执行) + // 5. 删除旧数据(只在第一次处理时执行) if (processedFiles.isEmpty()) { int deleted = mysqlService.deleteBeforeDate(DB_SCHEMA, table, fileDate); logger.info("[{}][{}] deleted {} records before {}", DB_SCHEMA, table, deleted, fileDate); @@ -142,8 +156,8 @@ public class S3BatchToMySQLHandler implements RequestHandler Map progressThisTime = new HashMap<>(processedFiles); boolean allProcessed = true; - // 5. 依次处理每个CSV文件 - for (String s3Key : keys) { + // 6. 依次处理每个CSV文件 + for (String s3Key : csvKeys) { // 处理文件前检查超时 long elapsed = System.currentTimeMillis() - overallStartTime; if (elapsed >= timeoutThresholdMs) { @@ -171,7 +185,7 @@ public class S3BatchToMySQLHandler implements RequestHandler logger.info("[{}][{}] file {} progress: {}/{} rows", DB_SCHEMA, table, fileName, newProcessedRows, newProcessedRows); } - // 6. 记录处理结果 + // 7. 记录处理结果 if (allProcessed) { logger.info("[{}][{}] date processing completed", DB_SCHEMA, table); } else { diff --git a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java index f9109ee..486f3fc 100644 --- a/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java +++ b/src/main/java/com/dashboard/aws/lambda/service/MySQLService.java @@ -15,7 +15,7 @@ public class MySQLService { private static final String MYSQL_URL = System.getenv("DB_URL"); // 统一实例URL private static final String DB_USER = System.getenv("DB_USER"); private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); - private static final String batchSizeEnv = System.getenv("BATCH_SIZE"); + private static final String S3_2_DB_BATCH_SIZE = System.getenv("S3_2_DB_BATCH_SIZE"); /** @@ -90,11 +90,11 @@ public class MySQLService { // 从环境变量读取批量大小,默认5000 int batchSize = 5000; - if (batchSizeEnv != null && !batchSizeEnv.trim().isEmpty()) { + if (S3_2_DB_BATCH_SIZE != null && !S3_2_DB_BATCH_SIZE.trim().isEmpty()) { try { - batchSize = Integer.parseInt(batchSizeEnv.trim()); + batchSize = Integer.parseInt(S3_2_DB_BATCH_SIZE.trim()); } catch (NumberFormatException e) { - logger.warn("Invalid BATCH_SIZE: {}, using default 5000", batchSizeEnv); + logger.warn("Invalid BATCH_SIZE: {}, using default 5000", S3_2_DB_BATCH_SIZE); } } diff --git a/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java b/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java index 88db2bd..baed4bd 100644 --- a/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java +++ b/src/test/java/com/dashboard/aws/lambda/S3CsvReorganizer.java @@ -42,8 +42,8 @@ public class S3CsvReorganizer { .credentialsProvider( StaticCredentialsProvider.create( AwsBasicCredentials.create( - "AKA", - "IFwUPLd") + "AKI", + "IFwUPwW") ) ) .build(); @@ -79,7 +79,7 @@ public class S3CsvReorganizer { String basePath = matcher.group(1); // dashboard-for-backup/dashboard_record_measure/ String date = matcher.group(2); // 2025-12-25 - String newKey = basePath + date + "/00000001.csv"; + String newKey = basePath + date + "/001.csv"; // 如果目标文件和源文件相同则跳过 if (oldKey.equals(newKey)) { @@ -122,7 +122,7 @@ public class S3CsvReorganizer { S3CsvReorganizer reorganizer = new S3CsvReorganizer(); reorganizer.reorganize( - "dashboard_record_accumulate/" + "dashboard_record_measure/" ); } }