import re from collections import defaultdict import time from tqdm import tqdm import os class TextMatcher: def __init__(self): self.special_chars = set() def load_special_chars(self, file_path): """加载汉字字符补集""" try: with open(file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line: self.special_chars.add(line) except FileNotFoundError: print(f"汉字字符补集文件 {file_path} 未找到") def is_chinese_char(self, char): """判断字符是否为汉字或特殊汉字字符""" return (0x4e00 <= ord(char) <= 0x9fff or 0x3400 <= ord(char) <= 0x4dbf or # 扩展A 0x20000 <= ord(char) <= 0x2a6df or # 扩展B 0x2a700 <= ord(char) <= 0x2b73f or # 扩展C 0x2b740 <= ord(char) <= 0x2b81f or # 扩展D 0x2b820 <= ord(char) <= 0x2ceaf or # 扩展E 0x2ceb0 <= ord(char) <= 0x2ebef or # 扩展F char in self.special_chars) def extract_chinese_chars(self, text): """提取文本中的汉字字符""" chinese_chars = [] for char in text: if self.is_chinese_char(char): chinese_chars.append(char) return ''.join(chinese_chars) def extract_term_and_body(self, text): """提取词项和词身""" # 找到【...】格式的词项 term_match = re.search(r'【[^】]*】', text) if term_match: term = term_match.group(0) # 保留【】符号 body = text.replace(term, '', 1) # 移除词项部分 term_content = term[1:-1] # 去掉【】符号 term_chinese = self.extract_chinese_chars(term_content) body_chinese = self.extract_chinese_chars(body) full_chinese = self.extract_chinese_chars(text) return term_chinese, body_chinese, full_chinese, term, body else: # 如果没有找到词项格式,整个文本作为词身 body_chinese = self.extract_chinese_chars(text) return "", body_chinese, body_chinese, "", text def find_all_common_substrings(self, str1, str2): """找到两个字符串中所有长度>=2的公共子串""" if not str1 or not str2: return [] common_substrings = set() m, n = len(str1), len(str2) # 使用动态规划找到所有公共子串 dp = [[0] * (n + 1) for _ in range(m + 1)] for i in range(1, m + 1): for j in range(1, n + 1): if str1[i - 1] == str2[j - 1]: dp[i][j] = dp[i - 1][j - 1] + 1 if dp[i][j] >= 2: # 至少2个字符 # 找到当前公共子串 end_pos = i length = dp[i][j] start_pos = end_pos - length substring = str1[start_pos:end_pos] common_substrings.add(substring) else: dp[i][j] = 0 return list(common_substrings) def calculate_overlap_ratio(self, common_substrings, str1, str2): """计算重合度 - 交叉赋值""" if not common_substrings: return 0, 0 # 计算所有公共子串的总长度,但要去除重叠部分 # 这里采用更精确的方法:找到不重叠的公共子串的最大覆盖 total_common_len = 0 if common_substrings: # 按长度排序,从长到短 sorted_substrings = sorted(common_substrings, key=len, reverse=True) # 使用贪心算法找出不重叠的子串 covered_positions_str1 = set() covered_positions_str2 = set() for substring in sorted_substrings: # 在str1中找到子串的所有位置 for start1 in range(len(str1) - len(substring) + 1): if str1[start1:start1 + len(substring)] == substring: # 检查这个位置是否已被覆盖 positions1 = set(range(start1, start1 + len(substring))) if not (positions1 & covered_positions_str1): # 没有交集 # 在str2中找到匹配的位置 for start2 in range(len(str2) - len(substring) + 1): if str2[start2:start2 + len(substring)] == substring: positions2 = set(range(start2, start2 + len(substring))) if not (positions2 & covered_positions_str2): # 没有交集 # 添加到覆盖集合 covered_positions_str1.update(positions1) covered_positions_str2.update(positions2) total_common_len += len(substring) break break # 交叉赋值:gphd的重合度 = 公共部分 / dyhdc总长度 # dyhdc的重合度 = 公共部分 / gphd总长度 gphd_ratio = (total_common_len / len(str2)) * 100 if len(str2) > 0 else 0 dyhdc_ratio = (total_common_len / len(str1)) * 100 if len(str1) > 0 else 0 return gphd_ratio, dyhdc_ratio # gphd_ratio, dyhdc_ratio def read_file(self, file_path): """读取文件""" lines = [] with open(file_path, 'r', encoding='utf-8') as f: for line in f: lines.append(line.rstrip('\n\r')) return lines def write_file(self, file_path, lines): """写入文件""" with open(file_path, 'w', encoding='utf-8') as f: for line in lines: f.write(line + '\n') def process(self): print("正在加载汉字字符补集...") self.load_special_chars(r"E:\lab_text\汉字字符补集.txt") print("正在读取文件...") gphd_lines = self.read_file(r"E:\lab_text\gphd05.txt") dyhdc_lines = self.read_file(r"E:\lab_text\dyhdc05.txt") print("正在解析gphd05.txt文件...") gphd_data = [] for line in tqdm(gphd_lines, desc="解析gphd数据", unit="行"): parts = line.split('\t', 2) if len(parts) >= 2: number = parts[0] text = parts[1] term_chinese, body_chinese, full_chinese, term_with_brackets, body = self.extract_term_and_body(text) gphd_data.append({ 'original_line': line, 'number': number, 'text': text, 'term': term_chinese, 'body': body_chinese, 'full': full_chinese, 'term_with_brackets': term_with_brackets, 'body_raw': body, 'marked': False, 'result_line': line # 保存处理后的行 }) else: # 处理格式不正确的行 gphd_data.append({ 'original_line': line, 'number': '', 'text': line, 'term': '', 'body': self.extract_chinese_chars(line), 'full': self.extract_chinese_chars(line), 'term_with_brackets': '', 'body_raw': line, 'marked': False, 'result_line': line }) print("正在解析dyhdc05.txt文件...") dyhdc_data = [] for line in tqdm(dyhdc_lines, desc="解析dyhdc数据", unit="行"): text = line term_chinese, body_chinese, full_chinese, term_with_brackets, body = self.extract_term_and_body(text) dyhdc_data.append({ 'original_line': line, 'text': text, 'term': term_chinese, 'body': body_chinese, 'full': full_chinese, 'term_with_brackets': term_with_brackets, 'body_raw': body, 'marked': False, 'result_line': line # 保存处理后的行 }) # 第一轮:完全匹配词条文节 print("\n开始第一轮对比 - 完全匹配词条文节...") matched_count = 0 for i in tqdm(range(len(gphd_data)), desc="第一轮对比", unit="行"): gphd_item = gphd_data[i] if gphd_item['marked']: continue for j in range(len(dyhdc_data)): dyhdc_item = dyhdc_data[j] if dyhdc_item['marked']: continue if gphd_item['full'] == dyhdc_item['full']: # 在gphd行新增第三列标记 gphd_data[i]['result_line'] = gphd_item['original_line'] + '\t★' gphd_data[i]['marked'] = True # 在dyhdc行新增第二列标记 dyhdc_data[j]['result_line'] = dyhdc_item['original_line'] + '\t★' dyhdc_data[j]['marked'] = True matched_count += 1 break print(f"第一轮完成,匹配了 {matched_count} 对词条") # 第二轮:词头相同,比较词身重合度 print("\n开始第二轮对比 - 词身重合度分析...") # 按词头分组 print("正在按词头分组...") gphd_by_term = defaultdict(list) dyhdc_by_term = defaultdict(list) for i, item in enumerate(gphd_data): if not item['marked']: # 只考虑未标记的 gphd_by_term[item['term']].append((i, item)) for j, item in enumerate(dyhdc_data): if not item['marked']: # 只考虑未标记的 dyhdc_by_term[item['term']].append((j, item)) # 统计需要处理的词头数量 all_terms = set(gphd_by_term.keys()) & set(dyhdc_by_term.keys()) print(f"需要处理的相同词头数量: {len(all_terms)}") # 进度跟踪 processed_terms = 0 total_matches = 0 for term in tqdm(all_terms, desc="第二轮处理词头", unit="词头"): gphd_items = gphd_by_term[term] dyhdc_items = dyhdc_by_term[term] for g_idx, g_item in gphd_items: best_match = None best_ratio = 0 for d_idx, d_item in dyhdc_items: if dyhdc_data[d_idx]['marked']: continue # 找到所有公共子串 common_substrings = self.find_all_common_substrings( g_item['body'], d_item['body'] ) if common_substrings: gphd_ratio, dyhdc_ratio = self.calculate_overlap_ratio( common_substrings, g_item['body'], d_item['body'] ) # 使用gphd的比率作为匹配度(gphd的重合度 = 公共部分 / dyhdc总长度) current_ratio = gphd_ratio if current_ratio > best_ratio: best_ratio = current_ratio best_match = (d_idx, d_item, common_substrings, gphd_ratio, dyhdc_ratio) if best_match: d_idx, d_item, common_substrings, gphd_ratio, dyhdc_ratio = best_match if not dyhdc_data[d_idx]['marked']: # 更新gphd行:新增标记列和重合度列,以及对照文本列 # gphd的重合度 = 公共部分 / dyhdc总长度 = 97.44% gphd_data[g_idx]['result_line'] = ( g_item['original_line'] + '\t☆\t' + f"{gphd_ratio:.2f}%\t" + d_item['text'] ) gphd_data[g_idx]['marked'] = True # 更新dyhdc行:新增标记列和重合度列 # dyhdc的重合度 = 公共部分 / gphd总长度 = 100.00% dyhdc_data[d_idx]['result_line'] = ( d_item['original_line'] + '\t☆\t' + f"{dyhdc_ratio:.2f}%" ) dyhdc_data[d_idx]['marked'] = True total_matches += 1 print(f"\n第二轮完成,匹配了 {total_matches} 对词条") # 为未标记的行添加空列以保持格式一致 print("正在整理输出格式...") final_gphd_lines = [] for item in tqdm(gphd_data, desc="整理gphd格式", unit="行"): parts = item['result_line'].split('\t') if len(parts) == 2: # 原始格式:编号\t文本 final_gphd_lines.append(item['result_line'] + '\t\t\t') # 添加三个空列 elif len(parts) == 3: # 已标记★格式:编号\t文本\t★ final_gphd_lines.append(item['result_line'] + '\t\t') # 添加两个空列 else: # 已处理过的格式 final_gphd_lines.append(item['result_line']) final_dyhdc_lines = [] for item in tqdm(dyhdc_data, desc="整理dyhdc格式", unit="行"): parts = item['result_line'].split('\t') if len(parts) == 1: # 原始格式:文本 final_dyhdc_lines.append(item['result_line'] + '\t\t') # 添加两个空列 elif len(parts) == 2: # 已标记★格式:文本\t★ final_dyhdc_lines.append(item['result_line'] + '\t') # 添加一个空列 else: # 已处理过的格式 final_dyhdc_lines.append(item['result_line']) # 输出结果 print("正在保存结果文件...") self.write_file(r"E:\lab_text\gphd05_matched.txt", final_gphd_lines) self.write_file(r"E:\lab_text\dyhdc05_matched.txt", final_dyhdc_lines) print(f"\n对比完成!") print(f"gphd05_matched.txt 已保存") print(f"dyhdc05_matched.txt 已保存") print(f"总计匹配: 第一轮 {matched_count} 对,第二轮 {total_matches} 对") def main(): matcher = TextMatcher() matcher.process() if __name__ == "__main__": # 检查是否安装了tqdm try: from tqdm import tqdm except ImportError: print("正在安装tqdm库...") import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "tqdm"]) from tqdm import tqdm main()