123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- from sklearn.feature_extraction.text import TfidfVectorizer
- from sklearn.metrics.pairwise import cosine_similarity
- import jieba
- import os
- import json
- from final.ByRules.util import calculate_sum_by_time_range
- def jieba_tokenizer(text):
- return list(jieba.cut(text))
- # 定义问题模板
- template_dict = {
- "1": ["某年全年累计省间交易电量是多少?"],
- "2": ["某年某月交易电量是多少?"],
- "3": ["某年某月到某月累计交易电量是多少?"],
- "8.1": ["某年省间交易电量按交易周期划分的电量是多少?"],
- "8.2": ["某年省间交易电量按交易类型划分的电量是多少?"],
- "8.3": ["某年省间交易电量按发电类型划分的电量是多少?"],
- "8.4": ["某年省间交易电量按交易方式划分的电量是多少?"],
- "9.1": ["某年省间交易电量年度交易电量是多少?"],
- "9.2": ["某年省间交易电量月度交易电量是多少?"],
- "9.3": ["某年省间交易电量现货交易电量是多少?"],
- "9.4": ["某年省间交易电量应急交易电量是多少?"],
- "9.5": ["某年省间交易电量月内交易电量是多少?"],
- "9.6": ["某年省间交易电量省间外送交易电量是多少?"],
- "9.7": ["某年省间交易电量电力直接交易电量是多少?"],
- "9.8": ["某年省间交易电量合同交易电量是多少?"],
- "9.9": ["某年省间交易电量绿电交易电量是多少?"],
- "9.10": ["某年省间交易电量非市场化交易电量是多少?"],
- "9.11": ["某年省间交易电量新能源交易电量是多少?"],
- "9.12": ["某年省间交易电量火电交易电量是多少?"],
- "9.13": ["某年省间交易电量水电交易电量是多少?"],
- "9.14": ["某年省间交易电量核电交易电量是多少?"],
- "9.15": ["某年省间交易电量双边交易电量是多少?"],
- "9.16": ["某年省间交易电量集中交易电量是多少?"],
- "9.17": ["某年省间交易电量挂牌交易电量是多少?"],
- "17.1": ["那个省送出电量最高?是多少?"],
- "19": ["省间交易正在组织的交易有多少?"],
- "20": ["省间交易当月完成的交易有多少?"],
- "21": ["省间交易当年完成的交易有多少?"],
- "22": ["省间交易当年达成的电量有多少?"],
- "23": ["省间交易当年参与交易的家次有多少?"],
- }
- # 将地点映射成相应的代码
- def map_location_to_unit(location: str) -> str:
- mapping_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../省间关系映射.json'))
- if not os.path.exists(mapping_path):
- print(f"映射文件未找到: {mapping_path}")
- return '未知单位'
- with open(mapping_path, 'r', encoding='utf-8') as f:
- mapping = json.load(f)
- for code, name in mapping.items():
- if name == location:
- return code
- return '未知单位'
- # 提取时间和地点
- from typing import Tuple, List, Dict
- from datetime import datetime
- import re
- def extract_time_location(question: str) -> Tuple[List[Dict], List[str]]:
- current_date = datetime.now()
- current_year = current_date.year
- current_month = current_date.month
- # 匹配绝对时间
- absolute_patterns = [
- r'(?P<year>\d{4})年(?P<month>\d{1,2})月(?P<day>\d{1,2})日',
- r'(?P<year>\d{4})年(?P<month>\d{1,2})月',
- r'(?P<year>\d{4})年'
- ]
- relative_year_mapping = {
- '明年': current_year + 1,
- '今年': current_year,
- '去年': current_year - 1,
- '前年': current_year - 2
- }
- season_mapping = {
- '一季度': (1, 3),
- '二季度': (4, 6),
- '三季度': (7, 9),
- '四季度': (10, 12),
- '上半年': (1, 6),
- '下半年': (7, 12)
- }
- provinces = [
- '北京', '天津', '上海', '重庆', '河北', '山西', '辽宁', '吉林', '黑龙江',
- '江苏', '浙江', '安徽', '福建', '江西', '山东', '河南', '湖北', '湖南',
- '广东', '海南', '四川', '贵州', '云南', '陕西', '甘肃', '青海', '台湾',
- '内蒙古', '广西', '西藏', '宁夏', '新疆', '香港', '澳门'
- ]
- time_results = []
- used_keywords = set()
- # 🆕 处理“起止时间段”,格式:2023年1月到2024年2月、去年1月到今年2月、2023年1月到1月等
- range_pattern = r'(?P<start>(\d{4}|今|去|前|明)年(\d{1,2})?月?)到(?P<end>(\d{4}|今|去|前|明)年(\d{1,2})?月?)'
- for match in re.finditer(range_pattern, question):
- start_raw, end_raw = match.group('start'), match.group('end')
- def parse_relative(text):
- year = current_year
- month = None
- if '明年' in text:
- year = current_year + 1
- elif '今年' in text or '今' in text:
- year = current_year
- elif '去年' in text or '去' in text:
- year = current_year - 1
- elif '前年' in text or '前' in text:
- year = current_year - 2
- m = re.search(r'(\d{1,2})月', text)
- if m:
- month = int(m.group(1))
- return year, month
- def parse_absolute(text):
- m = re.match(r'(?P<year>\d{4})年(?P<month>\d{1,2})?月?', text)
- if m:
- year = int(m.group('year'))
- month = int(m.group('month')) if m.group('month') else None
- return year, month
- return None, None
- def parse_any(text):
- if any(key in text for key in relative_year_mapping.keys()) or text[:1] in ['今', '去', '前', '明']:
- return parse_relative(text)
- else:
- return parse_absolute(text)
- start_y, start_m = parse_any(start_raw)
- end_y, end_m = parse_any(end_raw)
- time_results.append({
- 'start_year': start_y, 'start_month': start_m,
- 'end_year': end_y, 'end_month': end_m,
- 'label': f'{start_raw}到{end_raw}',
- 'raw': match.group()
- })
- used_keywords.add(match.group())
- # 🆕 新增匹配“2024年1月到2月”,结束时间没有写年份,默认与开始时间同年
- partial_range_pattern = r'(?P<year>\d{4})年(?P<start_month>\d{1,2})月到(?P<end_month>\d{1,2})月'
- for match in re.finditer(partial_range_pattern, question):
- # 避免重复匹配已经被上面时间段匹配使用过的字符串
- if match.group() in used_keywords:
- continue
- year = int(match.group('year'))
- start_month = int(match.group('start_month'))
- end_month = int(match.group('end_month'))
- time_results.append({
- 'start_year': year,
- 'start_month': start_month,
- 'end_year': year,
- 'end_month': end_month,
- 'label': match.group(),
- 'raw': match.group()
- })
- used_keywords.add(match.group())
- # 相对+具体月份
- relative_absolute_pattern = r'(?P<relative>今|去|前)年(?P<month>\d{1,2})月'
- for match in re.finditer(relative_absolute_pattern, question):
- if match.group() in used_keywords:
- continue
- rel = match.group('relative')
- month = int(match.group('month'))
- year = {'今': current_year, '去': current_year - 1, '前': current_year - 2}.get(rel, current_year)
- time_results.append({'year': year, 'month': month, 'raw': match.group()})
- used_keywords.add(match.group())
- # 绝对时间
- for pattern in absolute_patterns:
- for match in re.finditer(pattern, question):
- if match.group() in used_keywords:
- continue
- time_info = {'raw': match.group()}
- gd = match.groupdict()
- if gd.get('year'):
- time_info['year'] = int(gd['year'])
- if gd.get('month'):
- time_info['month'] = int(gd['month'])
- if gd.get('day'):
- time_info['day'] = int(gd['day'])
- time_results.append(time_info)
- used_keywords.add(match.group())
- # 单独的相对年份关键词
- for term, year in relative_year_mapping.items():
- if term in question and term not in used_keywords:
- time_results.append({'year': year, 'label': term, 'raw': term})
- used_keywords.add(term)
- # 当前/上个月
- if '当前' in question and '当前' not in used_keywords:
- time_results.append({'year': current_year, 'month': current_month, 'label': '当前', 'raw': '当前'})
- used_keywords.add('当前')
- if '上个月' in question and '上个月' not in used_keywords:
- prev_year = current_year if current_month > 1 else current_year - 1
- prev_month = current_month - 1 if current_month > 1 else 12
- time_results.append({'year': prev_year, 'month': prev_month, 'label': '上个月', 'raw': '上个月'})
- used_keywords.add('上个月')
- # 季度和半年
- for term, (start_month, end_month) in season_mapping.items():
- if term in question and term not in used_keywords:
- time_results.append({
- 'year': current_year,
- 'label': term,
- 'start_month': start_month,
- 'end_month': end_month,
- 'raw': term
- })
- used_keywords.add(term)
- # 地点识别
- locations = [p for p in provinces if p in question]
- return time_results, locations
- # 先用 extract_time_location 判断问句包含哪类时间信息,然后只对结构匹配的模板子集做余弦匹配。
- # def classify_by_time_type(query, time_info):
- # if any('start_year' in t and 'end_year' in t for t in time_info):
- # return ['3'] # 时间段
- # return list(template_dict.keys()) # fallback 所有模板
- def classify_by_time_type(query, time_info):
- if not time_info:
- # 无时间信息时,返回指定模板 19-23
- return ['19', '20', '21', '22', '23']
- time = time_info[0]
- # 情况 1:起始时间和结束时间都有,判断为时间段
- if 'start_year' in time and 'end_year' in time:
- return ['3'] # 某年某月到某月累计交易电量
- # 情况 2:有 year 和 month,精确到月
- if 'year' in time and 'month' in time:
- return ['2'] # 某年某月交易电量
- # 情况 3:仅 year,全年
- if 'year' in time and 'month' not in time:
- return ['1','8.1','8.2','8.3','8.4','9.1','9.2','9.3','9.4','9.5','9.6','9.7','9.8','9.9','9.10','9.11','9.12','9.13','9.14','9.15','9.16','9.17'] # 某年全年累计交易电量
- def match_template_with_time_filter(query, template_dict, tokenizer, extract_time_location_func):
- """
- 先基于时间信息筛选候选模板,再进行TF-IDF匹配。
- """
- # 提取时间
- time_info, _ = extract_time_location_func(query)
- # 通过时间判断候选模板 key
- candidate_keys = classify_by_time_type(query, time_info)
- # 构造候选子模板字典
- filtered_template_dict = {k: template_dict[k] for k in candidate_keys}
- # 使用你原来的 TF-IDF 匹配函数
- return match_template(query, filtered_template_dict, tokenizer)
- # 找相似度最高的模板
- def match_template(query, template_dict, tokenizer):
- """
- 匹配与 query 最相似的模板句,并返回对应的 key、模板句和相似度分数。
- 参数:
- query (str): 用户输入的问句。
- template_dict (dict): 模板字典,格式为 {key: [模板句1, 模板句2, ...]}。
- tokenizer (callable): 分词器函数,例如 jieba.lcut。
- 返回:
- matched_key (str): 最相似模板的 key。
- best_match_sentence (str): 最相似的模板句。
- similarity_score (float): 相似度得分。
- """
- # 构造模板和 key 映射
- templates = []
- key_map = []
- for key, sentences in template_dict.items():
- for s in sentences:
- templates.append(s)
- key_map.append(key)
- # TF-IDF 向量化
- vectorizer = TfidfVectorizer(tokenizer=tokenizer)
- tfidf_matrix = vectorizer.fit_transform([query] + templates)
- # 计算余弦相似度
- cos_sim = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:])
- most_similar_idx = cos_sim.argmax()
- # 获取最相似结果
- best_match_sentence = templates[most_similar_idx]
- matched_key = key_map[most_similar_idx]
- similarity_score = cos_sim[0][most_similar_idx]
- return matched_key, best_match_sentence, similarity_score
- # 根据模板去对应的json文件中找数据
- def load_template_info(matched_key, json_folder):
- """
- 根据 matched_key 从指定文件夹中加载对应的模板 JSON 文件内容。
- 参数:
- matched_key (str): 匹配到的模板 key,一般作为 JSON 文件名(不含扩展名)。
- json_folder (str): JSON 文件所在的文件夹路径。
- 返回:
- dict: 解析后的 JSON 内容字典。
- 异常:
- - 如果文件不存在或解析出错,会抛出异常。
- """
- # 构造完整文件路径
- file_path = os.path.join(json_folder, f"{matched_key}.json")
- if not os.path.exists(file_path):
- raise FileNotFoundError(f"未找到 JSON 文件:{file_path}")
- # 读取并解析 JSON 文件
- with open(file_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- return data
- def process_query(query, template_dict, json_folder, tokenizer=jieba_tokenizer):
- # 提取条件
- time_info, location_info = extract_time_location(query)
- conditions = {}
- # 匹配模板
- matched_key, best_sentence, score = match_template_with_time_filter(
- query,
- template_dict,
- tokenizer,
- extract_time_location_func=extract_time_location
- )
- # 定义阈值
- similarity_threshold = 0.3
- # ★ 判断相似度阈值
- if score < similarity_threshold:
- return {
- "matched_key": None,
- "matched_template": None,
- "similarity_score": score,
- "type": None,
- "keywords": None,
- "target": None,
- "name": None,
- "conditions": conditions,
- "content": "您提问的问题目前我还没有掌握",
- "query": query,
- "play":"疑问"
- }
- if time_info:
- ti = time_info[0]
- # 先判断是否是区间时间(有start_year/end_year等字段)
- if 'start_year' in ti and 'end_year' in ti:
- conditions['start_year'] = ti.get('start_year')
- conditions['start_month'] = ti.get('start_month')
- conditions['end_year'] = ti.get('end_year')
- conditions['end_month'] = ti.get('end_month')
- else:
- # 单时间点
- if 'year' in ti:
- conditions['年'] = ti['year']
- if 'month' in ti:
- conditions['月'] = ti['month']
- if location_info:
- unit = map_location_to_unit(location_info[0])
- if unit and unit != '未知单位':
- conditions['单位'] = unit
- # 查询模板json
- template_info = load_template_info(matched_key, json_folder)
- # 模板的关键词
- keywords = template_info.get("keyword")
- # 模板中的映射关系
- target = template_info.get("target")
- # 模板的类型
- type_ = template_info.get("type", "")
- # 模板的名字
- dataJsonName = template_info.get("dataJsonName", "")
- # ---------------- 比较类 -----------------
- value_key = template_info.get("value_key", "")
- name_key = template_info.get("name_key", "")
- find_max = template_info.get("find_max")
- # block名称
- name = template_info.get("name", "")
- # 输出内容
- content = template_info.get("content", "")
- # 动作类型
- play = template_info.get("play", "")
- return {
- "matched_key": matched_key,
- "matched_template": best_sentence,
- "similarity_score": score,
- "type": type_,
- "keywords": keywords,
- "target": target,
- "dataJsonName": dataJsonName,
- "name": name,
- "conditions": conditions,
- "content": content,
- "query": query,
- "play": play,
- "find_max": find_max,
- "value_key": value_key,
- "name_key": name_key
- }
- # 查询类
- def smart_find_value(folder_path, file_name, conditions: dict, target_key: str):
- file_name = file_name + ".json"
- file_path = os.path.join(folder_path, file_name)
- if not os.path.exists(file_path):
- print(f"文件 {file_path} 不存在")
- return None
- with open(file_path, 'r', encoding='utf-8') as f:
- try:
- data = json.load(f)
- except json.JSONDecodeError as e:
- print(f"JSON 解析失败:{e}")
- return None
- def match_conditions(record):
- return all(record.get(k) == v for k, v in conditions.items())
- # 情况一:数据是 dict
- if isinstance(data, dict):
- if not conditions or match_conditions(data):
- values = find_key_recursively(data, target_key)
- return values[0] if len(values) == 1 else values if values else None
- return None
- # 情况二:数据是 list
- elif isinstance(data, list):
- results = []
- for record in data:
- if isinstance(record, dict) and match_conditions(record):
- matches = find_key_recursively(record, target_key)
- results.extend(matches)
- if not results:
- return None
- elif len(results) == 1:
- return results[0]
- else:
- return results
- # 查询类的辅助函数
- def find_key_recursively(data, target_key):
- results = []
- def _search(obj):
- if isinstance(obj, dict):
- for k, v in obj.items():
- if k == target_key:
- results.append(v)
- _search(v)
- elif isinstance(obj, list):
- for item in obj:
- _search(item)
- _search(data)
- return results
- # query = "当月省间交易完成的交易是多少?"
- # query = "2024年1月到2月累计交易电量是多少?"
- query = "2023年省间交易电量新能源交易电量是多少??"
- # query = "但同样阿贾克斯大口径的话我可合金外壳设计文件突然发?"
- json_folder = "templatesJson"
- result = process_query(query, template_dict, json_folder)
- print("匹配的模板 key:", result["matched_key"])
- print("最相似的模板句:", result["matched_template"])
- print("相似度分数:", result["similarity_score"])
- print("类型:", result["type"])
- print("关键词:", result["keywords"])
- print("查询字段:", result["target"])
- print("模型名字", result["name"])
- print("条件", result["conditions"])
- print("返回的内容是:", result["content"])
- print("问句是:", result["query"])
- print("动作是:", result["play"])
- type = result["type"]
- content = result["content"]
- json_data_folder = "..\Json\json_data"
- if type == "query":
- fileName = result["dataJsonName"]
- result = smart_find_value(json_data_folder, fileName,result["conditions"],result["target"] )
- print(result)
- elif type == "calculate":
- conditions = result["conditions"]
- start_conditions = {('年' if 'year' in k else '月'): v for k, v in conditions.items() if k.startswith('start_')}
- end_conditions = {('年' if 'year' in k else '月'): v for k, v in conditions.items() if k.startswith('end_')}
- print(start_conditions)
- print(end_conditions)
- fileName = result["dataJsonName"] + ".json"
- result = calculate_sum_by_time_range(json_data_folder,fileName,result["target"],start_conditions, end_conditions)
- print(result)
- #
- # # 最终回答的文本
- # final_content = content.replace("?", str(result))
- # # print(f"{content}{result}")
- #
- # print(final_content)
|