123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- import uuid
- from flask import Flask, request, jsonify, render_template
- import json
- import re
- import requests
- import os
- app = Flask(__name__, template_folder='templates')
- OLLAMA_URL = 'http://192.168.31.210:11434/api/generate'
- MATCH_URL = 'http://192.168.31.19:8352/api/kg/match'
- QUERY_URL = 'http://localhost:5000/query'
- MODEL_NAME = "deepseek-r1:32b"
- MAPPING_FILE = '省间关系映射.json'
- JAVA_TASK_URL = "http://localhost:8080/business/task" # 添加任务api
- JAVA_LOG_API = "http://localhost:8080/business/log" #添加日志api
- def log_to_java(
- task_guid: str,
- type_guid: str,
- content: str,
- proname: str,
- proid: str,
- level: str
- ):
- """
- 向 Java 后端写入日志记录
- 参数说明:
- - task_guid: 任务编号(Java 创建任务接口返回的)
- - type_id: 类型编号(如 "1" 表示任务创建)
- - log_detail: 日志详情(描述信息)
- - program_name: 程序名称(默认为 web_extract)
- - program_id: 程序 ID(默认为 py001)
- - log_level: 日志级别(INFO/WARN/ERROR)
- 返回:
- - True 表示成功,False 表示失败
- """
- log_data = {
- "taskGuid": task_guid,
- "typeGuid": type_guid,
- "content": content,
- "proname": proname,
- "proid": proid,
- "level": level.upper()
- }
- headers = {'Content-Type': 'application/json'}
- print(f"发送日志请求到 {JAVA_LOG_API},请求数据为:{log_data}")
- try:
- response = requests.post(JAVA_LOG_API, json=log_data, headers=headers)
- print("响应状态码:", response.status_code)
- print("响应内容:", response.text)
- response.raise_for_status()
- return {"success": True}
- except Exception as e:
- print(f"[日志记录失败] {e}")
- return {"success": False, "error": str(e)}
- def call_java_add_task(data):
- task_guid = str(uuid.uuid4()) # ✅ 生成 UUID 字符串
- task_payload = {
- # "taskContent": data.get("问句"),
- "taskContent": data,
- "taskGuid": task_guid # ✅ 把 uuid 一起传给后端
- }
- headers = {
- "Content-Type": "application/json"
- }
- response = requests.post(JAVA_TASK_URL, json=task_payload, headers=headers)
- return {
- "taskGuid": task_guid, # ✅ 返回给前端或后续流程使用
- "javaResponse": response.json()
- }
- def replace_province_with_code(response_data, mapping_file_path):
- with open(mapping_file_path, 'r', encoding='utf-8') as f:
- code_to_name = json.load(f)
- name_to_code = {v: k for k, v in code_to_name.items()}
- unit_name = response_data.get('条件', {}).get('单位', '')
- def normalize_name(name):
- suffixes = ['省', '市', '自治区', '特别行政区']
- for suffix in suffixes:
- if name.endswith(suffix):
- return name.replace(suffix, '')
- return name
- normalized_name = normalize_name(unit_name)
- if normalized_name in name_to_code:
- response_data['条件']['单位'] = name_to_code[normalized_name]
- else:
- print(f"⚠️ 未找到“{normalized_name}”对应的代码,保持原值。")
- return response_data
- def extract_valid_json_outside_think(text):
- cleaned = re.sub(r'<think>[\s\S]*?</think>', '', text)
- brace_stack = []
- json_start = None
- for i, char in enumerate(cleaned):
- if char == '{':
- if not brace_stack:
- json_start = i
- brace_stack.append('{')
- elif char == '}':
- if brace_stack:
- brace_stack.pop()
- if not brace_stack:
- json_str = cleaned[json_start:i + 1]
- try:
- return json.loads(json_str)
- except json.JSONDecodeError:
- continue
- raise ValueError("❌ 无法从 think 外部提取合法 JSON")
- @app.route("/")
- def index():
- return render_template("index.html")
- @app.route("/web_extract", methods=["POST"])
- def web_extract():
- try:
- data = request.get_json()
- question = data.get("question", "").strip()
- if not question:
- return jsonify({"error": "缺少问句参数"}), 400
- try:
- # 调用 Java 接口新增任务
- java_result = call_java_add_task(question)
- # 声明任务的id
- task_id = java_result['taskGuid']
- # 新增提交日志
- log_to_java(
- task_guid=task_id,
- type_guid="",
- content=f"成功调用模型,传递参为:{question}\n",
- proname="提交问句",
- proid="1",
- level="INFO"
- )
- except Exception as e:
- # 如果无法获取 task_id,记录一个默认或空的 task_guid
- log_to_java(
- task_guid="",
- type_guid="",
- content=f"调用任务创建接口失败或写日志失败,问句为:{question}\n错误信息:{str(e)}",
- proname="提交问句",
- proid="1",
- level="ERROR"
- )
- return jsonify({"error": "任务创建失败", "detail": str(e)}), 500
- prompt = f"""
- 现在我要让你在一个或者多个问句中抽取出三元组,其中条件包含:年份、月份、日、省份(问句中有哪些条件就写哪些条件,没有的无需添加到结果中),实体包含:送出电量、送出电量占售电量的比、送出均价、受入电量、送出电量占售电量的比、受入均价。注意:实体在问句中的表示可能不那么准确,请你判断出和我给你实体意思最相近的问句中的实体。
- 问句如下:
- {question}
- 输出格式为:
- {{
- "问句": 问句,
- "实体": ["送出电量"],
- "条件": {{
- "单位":"山东省",
- "年":"2024",
- "月":"1",
- "日":"1"
- }}
- }}
- 输出结果只显示上述内容,不输出其他任何东西。
- """
- payload = {
- "model": MODEL_NAME,
- "prompt": prompt,
- "stream": False
- }
- # 调用deepseek对问句进行语义分析
- try:
- response = requests.post(OLLAMA_URL, json=payload)
- response.raise_for_status() # 确保 HTTP 状态码为 200,否则抛出异常
- result = response.json()
- # 请求成功,记录 INFO 日志
- log_to_java(
- task_guid=task_id,
- type_guid="",
- content=f"成功调用模型,入参为:{json.dumps(payload, ensure_ascii=False)}\n返回结果为:{json.dumps(result, ensure_ascii=False)}",
- proname="DeepSeek语义识别",
- proid="2",
- level="INFO"
- )
- except Exception as e:
- # 请求失败或解析失败,记录 ERROR 日志
- log_to_java(
- task_guid=task_id,
- type_guid="",
- content=f"调用模型失败,入参为:{json.dumps(payload, ensure_ascii=False)}\n,错误信息:{str(e)}",
- proname="DeepSeek语义识别",
- proid="2",
- level="ERROR"
- )
- return jsonify({"error": "调用大模型失败", "detail": str(e)}), 500
- # 对获取到的数据进行处理
- response_str = result.get("response", "")
- response_data = extract_valid_json_outside_think(response_str)
- updated_data = replace_province_with_code(response_data, MAPPING_FILE)
- # return jsonify(updated_data)
- try:
- match_response = requests.post(MATCH_URL, json=updated_data)
- match_result = match_response.json()
- # 匹配成功日志
- log_to_java(
- task_guid=task_id,
- type_guid="", # 匹配成功日志类型编号
- content=f"关键词匹配成功,入参为:{json.dumps(updated_data, ensure_ascii=False)}\n响应内容:{match_result}",
- proname="关键词匹配",
- proid="3",
- level="INFO"
- )
- except Exception as e:
- log_to_java(
- task_guid=task_id,
- type_guid="", # 匹配异常日志类型编号
- content=f"关键词匹配失败,入参为:{json.dumps(updated_data, ensure_ascii=False)}\n响应内容:{str(e)}",
- proname="关键词匹配",
- proid="3",
- level="ERROR"
- )
- return jsonify({"error": "请求匹配服务时发生异常", "detail": str(e)}), 500
- # return jsonify({"success": True}), 200
- # 问答模块
- try:
- query_payload = match_result["data"]
- query_response = requests.post(QUERY_URL, json=query_payload)
- query_response.raise_for_status() # 会在 4xx/5xx 抛出异常
- query_result = query_response.json()
- log_to_java(
- task_guid=task_id,
- type_guid="",
- content=f"问答成功,入参为:{json.dumps(query_payload, ensure_ascii=False)}\n响应内容:{query_result}",
- proname="问答程序",
- proid="4",
- level="INFO"
- )
- except requests.exceptions.RequestException as e:
- log_to_java(
- task_guid=task_id,
- type_guid="",
- content=f"问答失败:入参为:{json.dumps(query_payload, ensure_ascii=False)}\n响应内容:{str(e)}",
- proname="问答程序",
- proid="4",
- level="ERROR"
- )
- return jsonify({"error": "查询接口请求失败", "details": str(e)}), 500
- return jsonify(query_result), 200
- except Exception as e:
- return jsonify({"error": str(e)}), 500
- if __name__ == "__main__":
- app.run(host="0.0.0.0", port=8866, debug=True)
|