import re from typing import List, Dict # 中国省份列表 provinces = [ '北京', '天津', '河北', '山西', '内蒙古', '辽宁', '吉林', '黑龙江', '上海', '江苏', '浙江', '安徽', '福建', '江西', '山东', '河南', '湖北', '湖南', '广东', '广西', '海南', '重庆', '四川', '贵州', '云南', '西藏', '陕西', '甘肃', '青海', '宁夏', '新疆' ] province_pattern = '|'.join([f'{p}省|{p}市|{p}' for p in provinces]) # 意图关键词 → 意图类型 intent_keywords = { # '电量': '省间交易电量', # '同比': '同比变化', # '环比': '环比变化', # '均价': '均价', '送出电量': '送出电量', '送出电量占售电量的比': '送出电量占售电量的比', '送出电量占比': '送出电量占售电量的比', '送电占比': '送出电量占售电量的比', '送出均价': '送出均价', '送电均价': '送出均价', '送出平均价': '送出均价', '送电平均价': '送出均价', '送出的平均价': '送出均价', # '受入电量': '受入电量', # '受电': '受入电量', # '成交未结算': '未结算电量', # '占比': '电源结构占比', # '最多': '最大值统计', # '地图': '地图特征分析', # '多少笔': '交易数量统计', # '完成的交易': '交易数量统计', # '参与': '参与主体数量' } def split_question(question: str) -> List[str]: """ 拆分包含多个子意图的问句,返回独立问句列表。 """ # 中文问号或句号断句 parts = re.split(r'[??。]', question) parts = [p.strip() for p in parts if p.strip()] result = [] for part in parts: # 对同比/环比/相比做特殊处理,补足时间 if '同比' in part and '相比' not in part: base = extract_base_time(part) result.append(f"{base}与去年同期相比变化如何?") elif '环比' in part and '相比' not in part: base = extract_base_time(part) result.append(f"{base}与上月相比变化如何?") else: result.append(part + '?') return result def extract_base_time(text: str) -> str: """ 提取句子中的基本时间片段如“2024年3月” """ m = re.search(r'\d{4}年\d{1,2}月', text) return m.group(0) if m else "当前月份" def extract_info(question: str) -> Dict: """ 从单个问句中提取结构化信息 """ years = re.findall(r'\d{4}年', question) months = re.findall(r'(\d{1,2}月(?:至\d{1,2}月)?)', question) provinces_found = re.findall(province_pattern, question) provinces_found = list(set(p.replace('省', '').replace('市', '') for p in provinces_found)) intents = [] for keyword, label in intent_keywords.items(): if keyword in question: intents.append(label) return { "question": question, "year": list(set(y.replace("年", "") for y in years)), "month": months, "province": provinces_found, "intent": list(set(intents)) } def process_questions(questions: List[str]) -> List[Dict]: """ 对一组原始问句进行拆分和信息抽取 """ all_results = [] for q in questions: split_qs = split_question(q) for sq in split_qs: info = extract_info(sq) all_results.append(info) return all_results def load_questions_from_file(filepath: str) -> List[str]: """ 从文本文件中逐行读取问句 """ with open(filepath, 'r', encoding='utf-8') as f: lines = f.readlines() return [line.strip() for line in lines if line.strip()] # 替换此部分,改为读取文件中的问句 questions = load_questions_from_file("question_test.txt") # 调用处理函数 results = process_questions(questions) # 输出结果 from pprint import pprint pprint(results)