Browse Source

init

master
jiangAB 1 month ago
commit
061fa4d3f1
  1. BIN
      .DS_Store
  2. 2
      .gitignore
  3. BIN
      __pycache__/step4_major.cpython-313.pyc
  4. 3
      asd.py
  5. BIN
      components/.DS_Store
  6. 0
      components/__init__.py
  7. BIN
      components/__pycache__/__init__.cpython-313.pyc
  8. BIN
      components/__pycache__/ai_check.cpython-313.pyc
  9. BIN
      components/__pycache__/doubao_process.cpython-313.pyc
  10. BIN
      components/__pycache__/jsonl_repair_reindex.cpython-313.pyc
  11. BIN
      components/__pycache__/spilit.cpython-313.pyc
  12. BIN
      components/__pycache__/step1.cpython-313.pyc
  13. 66
      components/ai_check.py
  14. 45
      components/doubao_process.py
  15. 59
      components/jsonl_repair_reindex.py
  16. 46
      components/spilit.py
  17. 135
      components/step1.py
  18. 59
      jsonl_clear.py
  19. 0
      process.log
  20. 65
      process.sh
  21. 25
      readme.md
  22. 30
      replace.py
  23. 64
      replace_answer_detail.py
  24. 1
      requirements.txt
  25. 100
      reset_id.py
  26. 19
      start.sh
  27. 69
      step1_pre.py
  28. 30
      step2_ai1.py
  29. 30
      step3_ai2.py
  30. 68
      step4_major.py

BIN
.DS_Store

Binary file not shown.

2
.gitignore

@ -0,0 +1,2 @@
work_data
venv

BIN
__pycache__/step4_major.cpython-313.pyc

Binary file not shown.

3
asd.py

@ -0,0 +1,3 @@
import json
from components.jsonl_repair_reindex import process_jsonl
process_jsonl('./huaxue_en.jsonl', './huaxue_en.jsonl.jsonl')

BIN
components/.DS_Store

Binary file not shown.

0
components/__init__.py

BIN
components/__pycache__/__init__.cpython-313.pyc

Binary file not shown.

BIN
components/__pycache__/ai_check.cpython-313.pyc

Binary file not shown.

BIN
components/__pycache__/doubao_process.cpython-313.pyc

Binary file not shown.

BIN
components/__pycache__/jsonl_repair_reindex.cpython-313.pyc

Binary file not shown.

BIN
components/__pycache__/spilit.cpython-313.pyc

Binary file not shown.

BIN
components/__pycache__/step1.cpython-313.pyc

Binary file not shown.

66
components/ai_check.py

@ -0,0 +1,66 @@
import glob
import json
from pathlib import Path
import time
from components.doubao_process import ask_question
# from components.jsonl_repair_reindex import process_json
def get_spilit_file_list(file_path):
# 递归匹配所有子目录中的 .txt 文件
recursive_txt_files = glob.glob(file_path+'/*.jsonl', recursive=True)
return recursive_txt_files
def process_jsonl_file(file_path,output_folder,logger):
base_prompt = '该json中的answer_detail不够详细,请改写为详细内容,格式要求为分步骤推导,格式为:"Step 1: ...\\nStep 2: ...\\nStep 3: ...;要求公式呈现 所有使用的公式需独立写出(如"F=ma"),不能隐含在文字中变量代入过程,展示具体数值代入公式的过程(如"9.8=5×a");latex公式使用双斜杠;返回具有id,answer_detail字段的新json,json不要换行,不需要其他回复'
temp_cache_0 = list(read_jsonl(file_path))
start_time = time.time()
for item in temp_cache_0:
quest_detail = process_question(item)
full_question = base_prompt+quest_detail
ai_response = ask_question(full_question)
logger.info(f"问题: {quest_detail}")
logger.info(f"回答: {ai_response}")
logger.info("===================================")
with open(file_path + '.replace', 'a', encoding='utf-8') as f:
# json_str = process_json(ai_response)
# json_line = json.dumps(ai_response, ensure_ascii=False)
f.write(ai_response + '\n')
end_time = time.time()
logger.info(f"处理时间: {end_time - start_time}")
def read_jsonl(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
yield json.loads(line.strip()) # 每行解析为字典 [[3]]
# 标准化处理函数 [[8]][[9]]
def process_question(json_line):
data = json_line
# 提取关键字段(根据实际JSON结构调整)
question = data.get('q_main', '')
answer = data.get('answer_detail', '')
# 文本压缩处理
# question = re.sub(r'\s+', ' ', question).strip()
# question = re.sub(r'(\d)\s+(\d)', r'\1\2', question) # 合并断裂数字
# question = re.sub(r'(?<![\d.])\d{4,}(?![\d.])',
# lambda m: f"{float(m.group()):.2e}", question) # 科学计数法
# 重组为最小化JSON结构
return json.dumps({
'id': data.get('id'),
'question': question,
'answer_detail': answer
}, ensure_ascii=False)
# 写入 JSONL 文件
def write_jsonl(file_path, data_list):
with open(file_path, 'w', encoding='utf-8') as f:
for item in data_list:
json_line = json.dumps(item, ensure_ascii=False) # 转换为JSON字符串 [[3]]
f.write(json_line + '\n') # 每行写入一个JSON对象 [[2]

45
components/doubao_process.py

@ -0,0 +1,45 @@
import os
import requests
# 配置 API Key 和 URL
# API_KEY = "473ac711-84a6-4899-b35b-672e4333e269" # 从环境变量读取 API Key
API_KEY = "e0430cc0-a75f-4398-bcb2-c455344fd837"
BASE_URL = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
# 构造请求头和数据
def ask_question(question):
headers = {
"Authorization": f"Bearer {API_KEY}", # 使用 Bearer Token 授权方式 [[1]]
"Content-Type": "application/json"
}
data = {
# "model": "doubao-lite-32k-240828", # 模型名称,推荐使用 32k 版本 [[4]]
"model": "doubao-1-5-pro-32k-250115",
"messages": [
{"role": "user", "content": question}
],
"max_tokens": 4096, # 控制返回的最大 token 数
"top_p": 0.7,
# "temperature": 0.7, # 控制随机性
"stream": False # 禁用流式传输,确保一次性返回完整结果 [[10]]
}
# 发起 POST 请求
response = requests.post(BASE_URL, headers=headers, json=data)
# 检查响应状态码
if response.status_code == 200:
result = response.json()
return result.get("choices", [{}])[0].get("message", {}).get("content", "")
else:
print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")
return None
# 测试提问
if __name__ == "__main__":
question = "Python如何实现多线程?"
answer = ask_question(question)
if answer:
print(f"问题: {question}")
print(f"回答: {answer}")

59
components/jsonl_repair_reindex.py

@ -0,0 +1,59 @@
import json
import os
import argparse
from json_repair import repair_json
def is_valid_json(json_str):
"""检查字符串是否是有效的JSON"""
try:
json.loads(json_str)
return True
except json.JSONDecodeError:
return False
def process_jsonl(input_file, output_file):
if not os.path.exists(input_file):
raise FileNotFoundError(f"输入文件不存在: {input_file}")
# id_counter = 1 # 自增ID计数器
with open(input_file, 'r', encoding='utf-8') as infile, \
open(output_file, 'w', encoding='utf-8') as outfile:
for line_number, line in enumerate(infile, start=1):
line = line.strip()
if not line:
continue
# 如果能解析就处理
if is_valid_json(line):
data = json.loads(line)
else:
# 尝试修复
try:
repaired = repair_json(line)
if repaired and is_valid_json(repaired):
data = json.loads(repaired)
else:
continue
except Exception as e:
print(f"{line_number} 行: 无法修复的JSON行: {line} | 错误: {e}")
continue
# 添加自增 id 字段
# data['id'] = id_counter
# id_counter += 1
# 写入文件,保留中文等非 ASCII 字符
outfile.write(json.dumps(data, ensure_ascii=False) + '\n')
# print(f"处理完成,共写入 {id_counter - 1} 条数据。结果保存到:{output_file}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="处理 JSONL 文件,添加自增 ID,并修复非法 JSON")
parser.add_argument("--input", type=str, required=True, help="输入的 JSONL 文件路径")
parser.add_argument("--output", type=str, required=True, help="输出的 JSONL 文件路径")
args = parser.parse_args()
process_jsonl(args.input, args.output)

46
components/spilit.py

@ -0,0 +1,46 @@
import json
from multiprocessing import Pool
import re
# JSONL分割函数 [[7]][[8]]
def split_jsonl(input_path,output_path, chunk_size=100000):
with open(input_path, 'r') as f:
chunk = []
for i, line in enumerate(f):
chunk.append(line)
if (i+1) % chunk_size == 0:
with open(output_path+f'_{i//chunk_size}.jsonl', 'w') as outfile:
outfile.writelines(chunk)
chunk = []
if chunk: # 处理剩余数据
with open(output_path+f'_{i//chunk_size}.jsonl', 'w') as outfile:
outfile.writelines(chunk)
# 标准化处理函数 [[8]][[9]]
def process_question(json_line):
data = json_line
# 提取关键字段(根据实际JSON结构调整)
question = data.get('q_main', '')
answer = data.get('std_ans', '')
# 文本压缩处理
# question = re.sub(r'\s+', ' ', question).strip()
# question = re.sub(r'(\d)\s+(\d)', r'\1\2', question) # 合并断裂数字
# question = re.sub(r'(?<![\d.])\d{4,}(?![\d.])',
# lambda m: f"{float(m.group()):.2e}", question) # 科学计数法
# 重组为最小化JSON结构
return json.dumps({
'id': data.get('id'),
'question': question,
'answer': answer
}, ensure_ascii=False)
# 并行处理 [[7]]
def preprocess_file(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
with Pool() as pool:
processed = pool.map(process_question, lines)
with open(file_path.replace('.jsonl', '_processed.jsonl'), 'w') as f:
f.write('\n'.join(processed))

135
components/step1.py

@ -0,0 +1,135 @@
# Start
import json
import jsonlines
def load_ori_file(ori_path,question_type_list):
ori_data_list = []
line_count = 0
with open(ori_path, 'r', encoding='utf-8') as f:
for line in f:
# 将每行解析为 JSON 对象
try:
line_count += 1
data = json.loads(line.strip())
# 判断题型
# if data["type"] not in question_type_list:
# continue
# 简单回答题型过滤
# if is_easy_answer(str(data["answer"])):
# continue
# # 初步校验
if ori_data_validate(data):
ori_data_list.append(data)
except json.JSONDecodeError:
print(f"Error decoding JSON on line {line_count}: {line.strip()}")
line_count += 1
continue
print(f"Total lines processed: {line_count}")
return ori_data_list
# 用于原始题目数据有效性的验证
def ori_data_validate(ori_data_str):
try:
if "如图" in ori_data_str:
return False
else:
return True
except Exception as e:
print(e)
return False
# 判断是否为简单回答
def is_easy_answer(s):
# 判断长度不超过5,并且包含中文字符
if len(s) > 5:
return False
# 使用正则表达式判断是否包含中文
import re
if re.search(r'[\u4e00-\u9fa5]', s):
return True
return False
def generate_processed_data(ori_data_list,major,output_path):
start_id = 1
processed_data_list = []
difficulty_map = {
'困难': '',
'': '',
'high': '',
'5': '',
'5星': '',
'hard': '',
'': '',
'medium': '',
'4': '',
'3': '',
'4星': '',
'3星': '',
'': '',
'low': '',
'1': '',
'1星': '',
'2': '',
'2星': '',
'easy': '',
}
#major和major_2的映射关系
for ori_item in ori_data_list:
try:
# 处理难度
if ori_item['difficulty'] in difficulty_map:
ori_item['difficulty'] = difficulty_map[ori_item['difficulty']]
else:
# 如果不匹配任何规则,可以选择跳过或记录日志
continue
# 处理数据
processed_item = {
"grade_class": "高等教育",
"grade": "大学",
"major": major,
"major_2": ori_item['subject'],
"language": "zh",
"id": start_id,
"q_main": ori_item['question'],
"std_ans": ori_item['answer'],
"answer_detail": ori_item['analyzing'],
"hard_level": ori_item['difficulty'],
"keypoint": ori_item['knowledge_point'],
"q_type": ori_item['type']
}
#清理html标签、不可见字符、异常字符
processed_item = clean_data(processed_item)
processed_data_list.append(processed_item)
start_id += 1
except Exception as e:
# logger.warning(f"KeyError: {e} in item: {ori_item}")
continue
# 将列表保存为 .jsonl 文件,这一步是最早的数据过滤和格式整合结果
print(f"Total valid JSON objects: {len(processed_data_list)}")
print("正在写入处理后的文件,请稍等...")
with jsonlines.open(output_path, mode='w') as writer:
writer.write_all(processed_data_list)
print("写入完成!")
def clean_text(text):
"""清理字符串中的非法字符"""
if isinstance(text, str):
# 替换孤立的代理字符
return text.encode('utf-8', errors='replace').decode('utf-8')
return text
def clean_data(data):
if isinstance(data, dict):
return {key: clean_data(value) for key, value in data.items()}
elif isinstance(data, list):
return [clean_data(item) for item in data]
elif isinstance(data, str):
return clean_text(data)
return data

59
jsonl_clear.py

@ -0,0 +1,59 @@
import json
import os
import argparse
from json_repair import repair_json
def is_valid_json(json_str):
"""检查字符串是否是有效的JSON"""
try:
json.loads(json_str)
return True
except json.JSONDecodeError:
return False
def process_jsonl(input_file, output_file):
if not os.path.exists(input_file):
raise FileNotFoundError(f"输入文件不存在: {input_file}")
id_counter = 1 # 自增ID计数器
with open(input_file, 'r', encoding='utf-8') as infile, \
open(output_file, 'w', encoding='utf-8') as outfile:
for line_number, line in enumerate(infile, start=1):
line = line.strip()
if not line:
continue
# 如果能解析就处理
if is_valid_json(line):
data = json.loads(line)
else:
# 尝试修复
try:
repaired = repair_json(line)
if repaired and is_valid_json(repaired):
data = json.loads(repaired)
else:
continue
except Exception as e:
print(f"{line_number} 行: 无法修复的JSON行: {line} | 错误: {e}")
continue
# 添加自增 id 字段
data['id'] = id_counter
id_counter += 1
# 写入文件,保留中文等非 ASCII 字符
outfile.write(json.dumps(data, ensure_ascii=False) + '\n')
print(f"处理完成,共写入 {id_counter - 1} 条数据。结果保存到:{output_file}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="处理 JSONL 文件,添加自增 ID,并修复非法 JSON")
parser.add_argument("--input", type=str, required=True, help="输入的 JSONL 文件路径")
parser.add_argument("--output", type=str, required=True, help="输出的 JSONL 文件路径")
args = parser.parse_args()
process_jsonl(args.input, args.output)

0
process.log

65
process.sh

@ -0,0 +1,65 @@
#!/bin/bash
folderPath=$1
fileName=$2
major=$3
split_size=$4
echo "正在进行文件预处理,请稍后"
python3 step1_pre.py "$folderPath/$fileName" "$major" "$split_size"
echo "文件预处理完成,请查看spilited_ai1文件夹中内容"
echo "现在开始处理AI答案正确检测...第一遍"
i=0
for file in $(ls "$folderPath"/spilited_ai1 | sort -V)
do
echo "启动任务,文件路径是:$folderPath/spilited_ai1/$file"
nohup python3 step2_ai1.py "$folderPath" "$folderPath/spilited_ai1/$file" "$i" > /dev/null 2>&1 &
i=$((i+1))
done
# 检测AI1任务是否完成
while true; do
file_count=$(ls -1 "$folderPath/ai_1" | grep -v '^d' | wc -l)
echo "AI1当前文件数量为"
echo "$file_count"
if [ "$file_count" -ge "$i" ]; then
echo "AI1任务全部完成,继续执行后续操作..."
break
else
echo "未达到"
echo "$i"
echo "个,等待10秒后继续..."
sleep 10
fi
done
echo "现在开始处理AI答案正确检测...第二遍"
j=0
for file in $(ls "$folderPath"/ai_1 | sort -V)
do
echo "启动任务,文件路径是:$folderPath/ai_1/$file"
nohup python3 step3_ai2.py "$folderPath" "$folderPath/ai_1/$file" "$j" > /dev/null 2>&1 &
j=$((j+1))
done
# 检测AI2任务是否完成
while true; do
file_count=$(ls -1 "$folderPath/ai_2" | grep -v '^d' | wc -l)
echo "AI2当前文件数量为"
echo "$file_count"
if [ "$file_count" -ge "$i" ]; then
echo "AI2任务全部完成,继续执行后续操作..."
break
else
echo "未达到"
echo "$i"
echo "个,等待10秒后继续..."
sleep 10
fi
done
echo "现在开始合并最终结果"
cat "$folderPath"/ai_2/* > "$folderPath"/ai_2_total/final_output.jsonl
echo "处理完成,请注意合并数据时课程与后续id重设等操作!"

25
readme.md

@ -0,0 +1,25 @@
# 用于批量处理AI题目的检测功能
会自动处理原始数据、进行数据过滤、第一次ai检测、第二次ai检测,最终合并在ai_2_total中。
分割文件大小自主控制一下,使一个文件分割成100个左右的文件来并行执行
# 使用方法
1. 在服务器上创建一个目录,例如./test_01,也可以用绝对路径,将原始jsonl文件放在路径中,例如./test_01/test.jsonl
2. 执行 sh start.sh ,按提示输入参数,例如:
请将原始文件放在一个单独目录,请输入文件夹路径:
./test_01
请输入文件名:
test.jsonl
请输入处理的科目名称,例如:物理:
测试
请输入AI并行提问分割文件的大小,例如:10000:
10
3. 运行成功后,会在./test_01下创建多个目录
transformed: 数据初步有有效性筛选后保存在这里
spilited_ai1:分割后的文件,用于第一次AI检测
ai_1:第一次AI检测后的存储目录
ai_2:第二次AI检测后的存储目录
ai_2_total:最终两次检测后合并在一起的数据
启动后会在process.log中可以查看日志情况。

30
replace.py

@ -0,0 +1,30 @@
import logging
import sys
from components.ai_check import process_jsonl_file
# 调用方法
# python3 step1_ai1.py parent_path output_path file_path file_index
parent_path = sys.argv[1]
file_path = sys.argv[2]
file_index = sys.argv[3]
# 配置日志记录器
logging.basicConfig(
level=logging.DEBUG, # 设置日志级别为 DEBUG(最低级别)
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # 日志格式
filename=parent_path+"/log/ai1_log_"+str(file_index)+'.log', # 将日志写入文件
filemode='a' # 写入模式('w' 表示覆盖,'a' 表示追加)
)
# 创建日志记录器
logger = logging.getLogger(__name__)
if __name__ == '__main__':
# # 读取数据文件
print("Start")
temp_filepath = file_path
process_jsonl_file(temp_filepath,parent_path+"/ai_1",logger=logger)

64
replace_answer_detail.py

@ -0,0 +1,64 @@
import json
import sys
def load_b_file(filepath):
"""读取文件 B,并返回以 id 为 key 的字典"""
b_data = {}
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
# 只保留需要更新的字段
if 'id' in data and 'answer_detail' in data:
b_data[data['id']] = data['answer_detail']
else:
print(f"警告:文件 B 中缺少必要字段: {line}")
except json.JSONDecodeError as e:
print(f"解析失败(文件 B):{e} -> {line[:50]}...")
return b_data
def update_a_file(a_filepath, b_dict, output_filepath):
"""读取文件 A,根据 b_dict 替换 answer_detail,并写入输出文件"""
with open(a_filepath, 'r', encoding='utf-8') as fin, \
open(output_filepath, 'w', encoding='utf-8') as fout:
for line_num, line in enumerate(fin, start=1):
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
record_id = data.get('id')
# 如果在 B 中有对应 id,则替换 answer_detail
if record_id in b_dict:
data['answer_detail'] = b_dict[record_id]
# 写回文件
fout.write(json.dumps(data, ensure_ascii=False) + '\n')
except json.JSONDecodeError as e:
print(f"{line_num} 行解析失败:{e} -> {line[:50]}...")
def main():
if len(sys.argv) != 4:
print("用法: python replace_answer_detail.py <文件A路径> <文件B路径> <输出文件路径>")
sys.exit(1)
a_file = sys.argv[1]
b_file = sys.argv[2]
output_file = sys.argv[3]
print("正在加载文件 B ...")
b_dict = load_b_file(b_file)
print(f"共加载 {len(b_dict)} 条记录。开始处理文件 A ...")
update_a_file(a_file, b_dict, output_file)
print("处理完成!结果已保存到:", output_file)
if __name__ == '__main__':
main()

1
requirements.txt

@ -0,0 +1 @@
wheel @ file:///opt/homebrew/Cellar/python%403.13/3.13.2/libexec/wheel-0.45.1-py3-none-any.whl#sha256=b9235939e2096903717cb6bfc132267f8a7e46deb2ec3ef9c5e234ea301795d0

100
reset_id.py

@ -0,0 +1,100 @@
# 用于交付前重制id列表
# 同时处理每条数据,删除问题内容前的序号
import argparse
import jsonlines
from tqdm import tqdm
import re
html_tag_pattern = re.compile(r'<[^>]+>') # 用于检测 HTML 标签
unicode_pattern = r'\\u[0-9a-fA-F]{4,}'
# 输入文件路径和输出文件路径
def parse_args():
parser = argparse.ArgumentParser(description="Find high Jaccard similarity entries in JSONL file")
parser.add_argument("--input", required=True, help="Input JSONL file path")
parser.add_argument("--output", required=True, help="Input JSONL file path")
parser.add_argument("--major", required=True, help="Input JSONL file path")
parser.add_argument("--start_id", type=int, default=1, help="Start ID (default: 0)")
return parser.parse_args()
# 获取输入文件的总行数(用于进度条)
def count_lines(file_path):
with open(file_path, "r", encoding="utf-8") as f:
return sum(1 for _ in f)
def process_file(input_file, output_file,start_id,major):
total_lines = count_lines(input_file)
# 打开输入文件进行逐行读取,打开输出文件进行逐行写入
with jsonlines.open(input_file, mode="r") as reader, jsonlines.open(output_file, mode="w") as writer:
new_id = start_id # 初始化新的 ID 从 1 开始
# 使用 tqdm 包裹 reader,显示进度条 [[10]]
for line in tqdm(reader, total=total_lines, desc="Processing", unit="line"):
q_main = line.get("q_main", "")
answer_detail = line.get("answer_detail", None)
std_ans = line.get("std_ans", None)
keypoint = line.get("keypoint", None)
major_2 = line.get("major_2", None)
# 跳过条件:
# 1. q_main 以数字开头
# 2. answer_detail 不存在 或 不是字符串
# 3. std_ans 不存在 或 不是字符串
# if (
# re.match(r'^\s*\d(?!\d)', q_main) or
# not isinstance(answer_detail, str) or
# not isinstance(std_ans, str) or
# not isinstance(keypoint, str) or
# not isinstance(major_2, str) or
# html_tag_pattern.search(q_main) or
# html_tag_pattern.search(answer_detail) or
# html_tag_pattern.search(std_ans) or
# re.search(unicode_pattern, major_2)
# ):
# continue
# 修改当前行的 id 字段
line["id"] = new_id
# line["grade"] = "研究生"
# line["major"] = major
# line["q_main"] = full_to_half(remove_prefix(line["q_main"]))
# line["answer_detail"] = full_to_half(line["answer_detail"])
# line["std_ans"] = full_to_half(line["std_ans"])
# 写入修改后的行到输出文件
writer.write(line)
# 更新 ID
new_id += 1
def remove_prefix(question):
"""
移除 question 字段开头的序号例如
- "1.", "2.", "1. ", "2. "
- "1.", "2."
- "1、"
- "1)"
- "2 题目:"
- "1题:""2题:"
- 处理类似 "2\\.", "3\\." 这种形式
"""
# 正则表达式匹配各种可能的前缀
pattern = r'^\s*\d+[\..\\))、]|\d+\s*题目:|\d+题:'
result = re.sub(pattern, '', question).lstrip()
return result
def full_to_half(text):
"""将文本中的全角字符转换为半角字符"""
res = ''
for char in text:
code = ord(char)
if code == 12288: # 全角空格
res += chr(32)
elif 65281 <= code <= 65374: # 全角标点符号
res += chr(code - 65248)
else:
res += char
return res
if __name__ == "__main__":
args = parse_args()
parser = argparse.ArgumentParser(description="JSONL格式验证工具")
process_file(args.input, args.output,args.start_id,args.major)
print("ID 重置完成,已保存到新文件:", args.output)

19
start.sh

@ -0,0 +1,19 @@
#!/bin/bash
folderPath=$1
fileName=$2
major=$3
split_size=$4
echo "正在进行文件预处理,请稍后"
python3 step1_pre.py "$folderPath/$fileName" "$major" "$split_size"
echo "文件预处理完成,请查看spilited_ai1文件夹中内容"
echo "现在开始处理AI答案正确检测...第一遍"
i=0
for file in $(ls "$folderPath"/spilited_ai1 | sort -V)
do
echo "启动任务,文件路径是:$folderPath/spilited_ai1/$file"
nohup python3 replace.py "$folderPath" "$folderPath/spilited_ai1/$file" "$i" > /dev/null 2>&1 &
i=$((i+1))
done

69
step1_pre.py

@ -0,0 +1,69 @@
import logging
import os
from pathlib import Path
import sys
from components.ai_check import get_spilit_file_list, process_jsonl_file
from components.spilit import split_jsonl
from components.step1 import generate_processed_data, load_ori_file, ori_data_validate
from step4_major import start_process_major2
#输入信息
#完整文件路径
start_file_path = sys.argv[1]
major = sys.argv[2]
split_size = sys.argv[3]
# 获取文件名,文件路径
p = Path(start_file_path)
ori_file_name = str(p.name)
ori_file_path = str(p.parent)
ori_file_pure_name = str(p.stem)
question_type_list = ['填空题', '解答题']
# 初始化文件夹路径
os.makedirs(ori_file_path+"/transformed", exist_ok=True)
os.makedirs(ori_file_path+"/major2_processed", exist_ok=True)
os.makedirs(ori_file_path+"/spilited_ai1", exist_ok=True)
os.makedirs(ori_file_path+"/ai_1", exist_ok=True)
os.makedirs(ori_file_path+"/ai_2", exist_ok=True)
os.makedirs(ori_file_path+"/ai_2_total", exist_ok=True)
os.makedirs(ori_file_path+"/log", exist_ok=True)
# 配置日志记录器
logging.basicConfig(
level=logging.DEBUG, # 设置日志级别为 DEBUG(最低级别)
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # 日志格式
filename=ori_file_path+"/log"+ori_file_pure_name+'.log', # 将日志写入文件
filemode='a' # 写入模式('w' 表示覆盖,'a' 表示追加)
)
# 创建日志记录器
logger = logging.getLogger(__name__)
def process_wrapper(args):
ai1_file_path, output_dir, logger = args
process_jsonl_file(ai1_file_path, output_dir, logger)
if __name__ == '__main__':
# 非json数据过滤
# 题型过滤
# 【如图】关键字过滤
# 简单回答过滤
# filtered_data_list = load_ori_file(start_file_path,question_type_list)
# 数据格式转换
# generate_processed_data(filtered_data_list,major,ori_file_path+"/transformed/"+ori_file_pure_name+".processed")
# Major_2数据过滤
# start_process_major2(ori_file_path+"/transformed/"+ori_file_pure_name+".processed",ori_file_path+"/major2_processed/"+ori_file_pure_name+".processed",major)
# 文件大小分割
split_jsonl(ori_file_path+"/major2_processed/"+ori_file_pure_name+".processed",
ori_file_path+"/spilited_ai1/"+ori_file_pure_name, chunk_size=int(split_size))
# AI过滤第一遍
# ai1_file_list = get_spilit_file_list(ori_file_path+"/spilited/")
# for ai1_file_path in ai1_file_list:
# process_jsonl_file(ai1_file_path,ori_file_path+"/ai_1/",logger)
# AI过滤第二遍
# 整合结果和日志

30
step2_ai1.py

@ -0,0 +1,30 @@
import logging
import sys
from components.ai_check import process_jsonl_file
# 调用方法
# python3 step1_ai1.py parent_path output_path file_path file_index
parent_path = sys.argv[1]
file_path = sys.argv[2]
file_index = sys.argv[3]
# 配置日志记录器
logging.basicConfig(
level=logging.DEBUG, # 设置日志级别为 DEBUG(最低级别)
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # 日志格式
filename=parent_path+"/log/ai1_log_"+str(file_index)+'.log', # 将日志写入文件
filemode='a' # 写入模式('w' 表示覆盖,'a' 表示追加)
)
# 创建日志记录器
logger = logging.getLogger(__name__)
if __name__ == '__main__':
# # 读取数据文件
print("Start")
temp_filepath = file_path
process_jsonl_file(temp_filepath,parent_path+"/ai_1",logger=logger)

30
step3_ai2.py

@ -0,0 +1,30 @@
import logging
import sys
from components.ai_check import process_jsonl_file
# 调用方法
# python3 step1_ai1.py parent_path output_path file_path file_index
parent_path = sys.argv[1]
file_path = sys.argv[2]
file_index = sys.argv[3]
# 配置日志记录器
logging.basicConfig(
level=logging.DEBUG, # 设置日志级别为 DEBUG(最低级别)
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # 日志格式
filename=parent_path+"/log/ai2_log_"+str(file_index)+'.log', # 将日志写入文件
filemode='a' # 写入模式('w' 表示覆盖,'a' 表示追加)
)
# 创建日志记录器
logger = logging.getLogger(__name__)
if __name__ == '__main__':
# # 读取数据文件
print("Start")
temp_filepath = file_path
process_jsonl_file(temp_filepath,parent_path+"/ai_2",logger=logger)

68
step4_major.py

@ -0,0 +1,68 @@
# 用于交付前重制id列表
import argparse
import jsonlines
from tqdm import tqdm
import re
# 输入文件路径和输出文件路径
def parse_args():
parser = argparse.ArgumentParser(description="处理Major2")
parser.add_argument("--input", required=True, help="Input JSONL file path")
parser.add_argument("--output", required=True, help="Input JSONL file path")
parser.add_argument("--major", required=True, help="输入科目")
return parser.parse_args()
# 获取输入文件的总行数(用于进度条)
def count_lines(file_path):
with open(file_path, "r", encoding="utf-8") as f:
return sum(1 for _ in f)
def process_file(input_file, output_file,major2_keywords):
total_lines = count_lines(input_file)
# 打开输入文件进行逐行读取,打开输出文件进行逐行写入
with jsonlines.open(input_file, mode="r") as reader, jsonlines.open(output_file, mode="w") as writer:
# 使用 tqdm 包裹 reader,显示进度条 [[10]]
for line in tqdm(reader, total=total_lines, desc="Processing", unit="line"):
# 判断major2是否有效
if line.get("major_2"):
if any(keyword in line["major_2"] for keyword in major2_keywords):
# 去除line["major_2"]开头的空格
line["major_2"] = line["major_2"].lstrip()
# 判断line["major_2"]中是否包含英文字符
if any('a' <= char.lower() <= 'z' for char in line["major_2"]):
print("major2中包含英文字符,请检查!内容是:"+str(line))
continue
# 写入修改后的行到输出文件
# 去除*、/、\、!、空格符号
line["major_2"] = line["major_2"].replace('*', '').replace('/', '').replace('\\', '').replace('!', '').replace(' ', '')
# 去除括号及其内容
line["major_2"] = re.sub(r'$$|$$|(|)|$|$|{|}|〈|〉|《|》|『|』|【|】|〖|〗|〘|〙|〚|〛', '', line["major_2"])
writer.write(line)
else:
print("major2不合法,跳过该行,内容是:"+str(line))
continue
def start_process_major2(input,output,major):
major2_keywords = []
if major == "化学":
#化学、有机、无机、分子
major2_keywords = ["有机","无机","分子","化学"]
elif major == "物理":
#物理、天体、宇宙、行星、黑洞、原子、核、力、动力、力学、流体、光、光子、电、电子、磁、量子、超导、热、纳米、晶体、半导体、能量、相对、波动、振动
major2_keywords = ["天体","宇宙","行星","黑洞","原子","","","动力","力学","流体","","光子","","电子","","量子","超导","","纳米","晶体","半导体","能量","相对","波动","振动","物理"]
else:
print("请输入正确的科目!")
exit()
process_file(input, output,major2_keywords)
print("major2清理完成,已保存到新文件:", output)
if __name__ == "__main__":
args = parse_args()
parser = argparse.ArgumentParser(description="JSONL格式验证工具")
start_process_major2(args.input,args.output,args.major)
Loading…
Cancel
Save