|
@@ -3,27 +3,133 @@ from sklearn.metrics.pairwise import cosine_similarity
|
|
|
import jieba
|
|
|
import os
|
|
|
import json
|
|
|
-
|
|
|
+from datetime import datetime
|
|
|
+from typing import Tuple, List, Dict
|
|
|
+import re
|
|
|
def jieba_tokenizer(text):
|
|
|
return list(jieba.cut(text))
|
|
|
-
|
|
|
+# 定义问题模板
|
|
|
template_dict = {
|
|
|
- "sjjy1_B03_output": [
|
|
|
+ "8": [
|
|
|
"某年省间交易电量按交易周期划分的电量是多少?",
|
|
|
"某年省间交易电量按交易类型划分的电量是多少?",
|
|
|
"某年省间交易电量按发电类型划分的电量是多少?",
|
|
|
"某年省间交易电量按交易方式划分的电量是多少?",
|
|
|
],
|
|
|
- "sjjy1_B06_output": [
|
|
|
+ "19": [
|
|
|
"省间交易正在组织的交易有多少?",
|
|
|
"省间交易当月完成的交易有多少?",
|
|
|
"省间交易当年完成的交易有多少?",
|
|
|
"省间交易当年达成的电量有多少?",
|
|
|
"省间交易当年参与交易的家次有多少?",
|
|
|
],
|
|
|
- "sjjy1_B08_output": ["某年全年累计省间交易电量是多少?"],
|
|
|
- "sjjy1_B01_output": ["某年某月交易电量是多少?"],
|
|
|
+ "1": ["某年全年累计省间交易电量是多少?"],
|
|
|
+ "2": ["某年某月交易电量是多少?"],
|
|
|
}
|
|
|
+# 将地点映射成相应的代码
|
|
|
+def map_location_to_unit(location: str) -> str:
|
|
|
+ mapping_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../省间关系映射.json'))
|
|
|
+ if not os.path.exists(mapping_path):
|
|
|
+ print(f"映射文件未找到: {mapping_path}")
|
|
|
+ return '未知单位'
|
|
|
+ with open(mapping_path, 'r', encoding='utf-8') as f:
|
|
|
+ mapping = json.load(f)
|
|
|
+ for code, name in mapping.items():
|
|
|
+ if name == location:
|
|
|
+ return code
|
|
|
+ return '未知单位'
|
|
|
+# 提取时间和地点
|
|
|
+def extract_time_location(question: str) -> Tuple[List[Dict], List[str]]:
|
|
|
+ current_date = datetime.now()
|
|
|
+ current_year = current_date.year
|
|
|
+ current_month = current_date.month
|
|
|
+
|
|
|
+ absolute_patterns = [
|
|
|
+ r'(?P<year>\d{4})年(?P<month>\d{1,2})月(?P<day>\d{1,2})日',
|
|
|
+ r'(?P<year>\d{4})年(?P<month>\d{1,2})月',
|
|
|
+ r'(?P<year>\d{4})年'
|
|
|
+ ]
|
|
|
+
|
|
|
+ relative_year_mapping = {
|
|
|
+ '明年': current_year + 1,
|
|
|
+ '今年': current_year,
|
|
|
+ '去年': current_year - 1,
|
|
|
+ '前年': current_year - 2
|
|
|
+ }
|
|
|
+
|
|
|
+ season_mapping = {
|
|
|
+ '一季度': (1, 3),
|
|
|
+ '二季度': (4, 6),
|
|
|
+ '三季度': (7, 9),
|
|
|
+ '四季度': (10, 12),
|
|
|
+ '上半年': (1, 6),
|
|
|
+ '下半年': (7, 12)
|
|
|
+ }
|
|
|
+
|
|
|
+ time_results = []
|
|
|
+
|
|
|
+ # 处理“去年12月”等相对年份+月份的组合
|
|
|
+ relative_absolute_pattern = r'(?P<relative>今|去|前)年(?P<month>\d{1,2})月'
|
|
|
+ for match in re.finditer(relative_absolute_pattern, question):
|
|
|
+ rel = match.group('relative')
|
|
|
+ month = int(match.group('month'))
|
|
|
+ year = {'今': current_year, '去': current_year - 1, '前': current_year - 2}.get(rel, current_year)
|
|
|
+ time_results.append({'year': year, 'month': month, 'raw': match.group()})
|
|
|
+
|
|
|
+ # 绝对时间匹配
|
|
|
+ for pattern in absolute_patterns:
|
|
|
+ for match in re.finditer(pattern, question):
|
|
|
+ time_info = {'raw': match.group()}
|
|
|
+ gd = match.groupdict()
|
|
|
+ if gd.get('year'):
|
|
|
+ time_info['year'] = int(gd['year'])
|
|
|
+ if gd.get('month'):
|
|
|
+ time_info['month'] = int(gd['month'])
|
|
|
+ if gd.get('day'):
|
|
|
+ time_info['day'] = int(gd['day'])
|
|
|
+ if time_info not in time_results:
|
|
|
+ time_results.append(time_info)
|
|
|
+
|
|
|
+ # 记录已匹配的相对时间词,避免重复
|
|
|
+ used_relatives = {tr['raw'] for tr in time_results if 'label' in tr or tr['raw'] in relative_year_mapping}
|
|
|
+
|
|
|
+ # 相对年份(“前年”“去年”“今年”“明年”)
|
|
|
+ for term, year in relative_year_mapping.items():
|
|
|
+ if term in question and term not in used_relatives:
|
|
|
+ time_results.append({'year': year, 'label': term, 'raw': term})
|
|
|
+
|
|
|
+ # 新增:相对月份处理
|
|
|
+ if '当前' in question:
|
|
|
+ time_results.append({'year': current_year, 'month': current_month, 'label': '当前', 'raw': '当前'})
|
|
|
+ if '上个月' in question:
|
|
|
+ # 计算上个月年月
|
|
|
+ if current_month == 1:
|
|
|
+ year = current_year - 1
|
|
|
+ month = 12
|
|
|
+ else:
|
|
|
+ year = current_year
|
|
|
+ month = current_month - 1
|
|
|
+ time_results.append({'year': year, 'month': month, 'label': '上个月', 'raw': '上个月'})
|
|
|
+
|
|
|
+ # 季度或半年
|
|
|
+ for term, (start_month, end_month) in season_mapping.items():
|
|
|
+ if term in question:
|
|
|
+ time_results.append({
|
|
|
+ 'year': current_year,
|
|
|
+ 'label': term,
|
|
|
+ 'start_month': start_month,
|
|
|
+ 'end_month': end_month,
|
|
|
+ 'raw': term
|
|
|
+ })
|
|
|
+
|
|
|
+ provinces = ['北京', '天津', '上海', '重庆', '河北', '山西', '辽宁', '吉林', '黑龙江',
|
|
|
+ '江苏', '浙江', '安徽', '福建', '江西', '山东', '河南', '湖北', '湖南',
|
|
|
+ '广东', '海南', '四川', '贵州', '云南', '陕西', '甘肃', '青海', '台湾',
|
|
|
+ '内蒙古', '广西', '西藏', '宁夏', '新疆', '香港', '澳门']
|
|
|
+
|
|
|
+ locations = [p for p in provinces if p in question]
|
|
|
+
|
|
|
+ return time_results, locations
|
|
|
# 找相似度最高的模板
|
|
|
def match_template(query, template_dict, tokenizer):
|
|
|
"""
|
|
@@ -88,23 +194,124 @@ def load_template_info(matched_key, json_folder):
|
|
|
data = json.load(f)
|
|
|
|
|
|
return data
|
|
|
-# 模板json文件存放的文件夹
|
|
|
+def process_query(query, template_dict, json_folder, tokenizer=jieba_tokenizer):
|
|
|
+ # 提取条件
|
|
|
+ time_info, location_info = extract_time_location(query)
|
|
|
+ conditions = {}
|
|
|
+
|
|
|
+ if time_info:
|
|
|
+ year = time_info[0].get('year')
|
|
|
+ if year:
|
|
|
+ conditions['年'] = year
|
|
|
+ if 'month' in time_info[0]:
|
|
|
+ conditions['月'] = time_info[0]['month']
|
|
|
+
|
|
|
+ if location_info:
|
|
|
+ unit = map_location_to_unit(location_info[0])
|
|
|
+ if unit and unit != '未知单位':
|
|
|
+ conditions['单位'] = unit
|
|
|
+ # 匹配模板
|
|
|
+ matched_key, best_sentence, score = match_template(query, template_dict, tokenizer)
|
|
|
+ # 查询模板json
|
|
|
+ template_info = load_template_info(matched_key, json_folder)
|
|
|
+ # 模板的关键词
|
|
|
+ keywords = template_info.get("keyword")
|
|
|
+ # 模板中的映射关系
|
|
|
+ target = template_info.get("target")
|
|
|
+ # 模板的类型
|
|
|
+ type_ = template_info.get("type", "")
|
|
|
+ # 模板的名字
|
|
|
+ name = template_info.get("dataJsonName", "")
|
|
|
+ # content
|
|
|
+ content = template_info.get("content", "")
|
|
|
+ # 参数为模板、关键词列表、映射
|
|
|
+
|
|
|
+ return {
|
|
|
+ "matched_key": matched_key,
|
|
|
+ "matched_template": best_sentence,
|
|
|
+ "similarity_score": score,
|
|
|
+ "type": type_,
|
|
|
+ "keywords": keywords,
|
|
|
+ "target": target,
|
|
|
+ "name": name,
|
|
|
+ "conditions": conditions,
|
|
|
+ "content": content
|
|
|
+ }
|
|
|
+# 查询类
|
|
|
+def smart_find_value(folder_path, file_name, conditions: dict, target_key: str):
|
|
|
+ file_name = file_name + ".json"
|
|
|
+ file_path = os.path.join(folder_path, file_name)
|
|
|
+
|
|
|
+ if not os.path.exists(file_path):
|
|
|
+ print(f"文件 {file_path} 不存在")
|
|
|
+ return None
|
|
|
+
|
|
|
+ with open(file_path, 'r', encoding='utf-8') as f:
|
|
|
+ try:
|
|
|
+ data = json.load(f)
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ print(f"JSON 解析失败:{e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ def match_conditions(record):
|
|
|
+ return all(record.get(k) == v for k, v in conditions.items())
|
|
|
+
|
|
|
+ # 情况一:数据是 dict
|
|
|
+ if isinstance(data, dict):
|
|
|
+ if not conditions or match_conditions(data):
|
|
|
+ values = find_key_recursively(data, target_key)
|
|
|
+ return values[0] if len(values) == 1 else values if values else None
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 情况二:数据是 list
|
|
|
+ elif isinstance(data, list):
|
|
|
+ results = []
|
|
|
+ for record in data:
|
|
|
+ if isinstance(record, dict) and match_conditions(record):
|
|
|
+ matches = find_key_recursively(record, target_key)
|
|
|
+ results.extend(matches)
|
|
|
+ if not results:
|
|
|
+ return None
|
|
|
+ elif len(results) == 1:
|
|
|
+ return results[0]
|
|
|
+ else:
|
|
|
+ return results
|
|
|
+# 查询类的辅助函数
|
|
|
+def find_key_recursively(data, target_key):
|
|
|
+ results = []
|
|
|
+
|
|
|
+ def _search(obj):
|
|
|
+ if isinstance(obj, dict):
|
|
|
+ for k, v in obj.items():
|
|
|
+ if k == target_key:
|
|
|
+ results.append(v)
|
|
|
+ _search(v)
|
|
|
+ elif isinstance(obj, list):
|
|
|
+ for item in obj:
|
|
|
+ _search(item)
|
|
|
+
|
|
|
+ _search(data)
|
|
|
+ return results
|
|
|
+# query = "当月省间交易完成的交易是多少?"
|
|
|
+query = "2024年全年累计省间交易电量是多少?"
|
|
|
json_folder = "templatesJson"
|
|
|
-query = "当月省间交易完成的交易是多少?"
|
|
|
-# 匹配模板
|
|
|
-matched_key, best_sentence, score = match_template(query, template_dict, jieba_tokenizer)
|
|
|
-# 查询模板json
|
|
|
-template_info = load_template_info(matched_key, json_folder)
|
|
|
-# 模板的关键词
|
|
|
-keywords = template_info.get("keyword")
|
|
|
-# 模板的类型
|
|
|
-type = template_info.get("type")
|
|
|
-# 模板中的映射关系
|
|
|
-mapping = template_info.get("mapping")
|
|
|
-
|
|
|
-print("匹配的模板 key:", matched_key)
|
|
|
-print("最相似的模板句:", best_sentence)
|
|
|
-print("相似度分数:", score)
|
|
|
-print("类型:", type)
|
|
|
-print("关键词:", keywords)
|
|
|
-print("映射关系:", mapping)
|
|
|
+
|
|
|
+
|
|
|
+result = process_query(query, template_dict, json_folder)
|
|
|
+
|
|
|
+print("匹配的模板 key:", result["matched_key"])
|
|
|
+print("最相似的模板句:", result["matched_template"])
|
|
|
+print("相似度分数:", result["similarity_score"])
|
|
|
+print("类型:", result["type"])
|
|
|
+print("关键词:", result["keywords"])
|
|
|
+print("查询字段:", result["target"])
|
|
|
+print("模型名字", result["name"])
|
|
|
+print("条件", result["conditions"])
|
|
|
+content = result["content"]
|
|
|
+
|
|
|
+json_data_folder = "..\Json\json_data"
|
|
|
+result = smart_find_value(json_data_folder, result["name"],result["conditions"],result["target"] )
|
|
|
+final_content = content.replace("?", str(result))
|
|
|
+# print(f"{content}{result}")
|
|
|
+
|
|
|
+print(final_content)
|