123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- import sys
- from flask import Flask, request, jsonify
- from commonUtil import fill_template, fill_template_auto
- from responseUtil import *
- from similarity_answer_json import *
- from util import *
- import os
- app = Flask(__name__)
- # TEMPLATE_FOLDER = "templatesJson"
- # DATA_FOLDER = "..\Json\json_data"
- # def resource_path(relative_path):
- # """适配开发环境 + PyInstaller,支持从项目根目录查找资源"""
- # if getattr(sys, 'frozen', False):
- # base_path = sys._MEIPASS
- # else:
- # # 向上两层回退到项目根目录:适配 ByRules 子目录结构
- # base_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
- # return os.path.join(base_path, relative_path)
- #
- # MAPPING_FILE = resource_path("final/Json/sjgxys.json")
- # TEMPLATE_FOLDER = resource_path("final/ByRules/templatesJson")
- # DATA_FOLDER = resource_path("final/Json/json_data")
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
- TEMPLATE_FOLDER = os.path.join(BASE_DIR, "templatesJson")
- DATA_FOLDER = os.path.join(BASE_DIR, "..", "Json", "json_data")
- MAPPING_FOLDER = os.path.join(BASE_DIR, "..", "Json", "sjgxys")
- MAPPING_FILE = os.path.join(MAPPING_FOLDER, "sjgxys.json")
- @app.route('/process_query', methods=['POST'])
- def process_query_route():
- data = request.get_json()
- query = data.get("query")
- if not query:
- # return jsonify({"error": "Query cannot be empty"}), 400
- # return jsonify({"error": "Query cannot be empty"}), 400
- return jsonify(error_response("Query cannot be empty"))
- try:
- # 按照模板匹配的方法
- result = process_query(query, template_dict, TEMPLATE_FOLDER)
- # # 按照自动机提取模板的方式进行匹配
- # extractor = EntityExtractor()
- # result = extractor.extract(query)
- # 如果没有该问题模板
- if result['play'] == '疑问':
- response = {
- # "content": final_content,
- "content_text": result["content"],
- # "raw_result": final_value,
- # "conditions": result["conditions"],
- "name": result["name"],
- "play": result["play"]
- }
- # return jsonify(response)
- return jsonify(success_response(data=response))
- # 查询类问题:
- if result['type'] == 'query':
- if result["flag"]:
- final_value = smart_find_value(DATA_FOLDER, result["dataJsonName"], result["conditions"], result["target"])
- final_value = final_value[0]
- else:
- final_value = smart_find_value(DATA_FOLDER, result["dataJsonName"], result["conditions"],
- result["target"])
- final_content = result["content"].replace("&", str(final_value))
- # final_content = fill_template_auto(result['content'], final_value)
- response = {
- "content": final_content,
- "content_text": result["content"],
- "raw_result": final_value,
- "conditions": result["conditions"],
- "name": result["name"],
- "play": result["play"],
- "unit": result["unit"],
- }
- # return jsonify(response)
- return jsonify(success_response(data=response))
- # 计算类问题
- elif result['type'] == 'calculate':
- conditions = result["conditions"]
- # 从条件中分离出开始时间和结束时间
- start_conditions = {('年' if 'year' in k else '月'): v for k, v in conditions.items() if
- k.startswith('start_')}
- end_conditions = {('年' if 'year' in k else '月'): v for k, v in conditions.items() if k.startswith('end_')}
- fileName = result["dataJsonName"] + ".json"
- final_value = calculate_sum_by_time_range(DATA_FOLDER, fileName, result["target"], start_conditions,
- end_conditions)
- response = {
- "content_text": result["content"],
- "raw_result": final_value,
- "conditions": result["conditions"],
- "name": result["name"],
- "play": result["play"],
- "unit": result["unit"],
- }
- # return jsonify(response)
- return jsonify(success_response(data=response))
- # 最值类问题
- elif result['type'] == 'compare_max_min':
- find_max = str(result['find_max']).lower() == 'true'
- final_value = find_max_or_min_value(folder_path=DATA_FOLDER,
- file_name=result["dataJsonName"],
- value_key=result['value_key'],
- name_key=result['name_key'],
- mapping_file=MAPPING_FILE,
- conditions=result["conditions"],
- find_max=find_max)
- keys = [result['name_key'], result['value_key']]
- final_content = fill_template_auto(result['content'], final_value, keys)
- response = {
- "content": final_content,
- "content_text": result["content"],
- "raw_result": final_value,
- "conditions": result["conditions"],
- "name": result["name"],
- "play": result["play"],
- "unit": result["unit"],
- }
- # return jsonify(response)
- return jsonify(success_response(data=response))
- # TopN
- elif result['type'] == 'topN':
- topN = result["conditions"]['rank']
- del result["conditions"]['rank']
- # print(topN)
- final_value = find_top_n_by_value(folder_path=DATA_FOLDER,
- file_name=result["dataJsonName"],
- value_key=result['value_key'],
- name_key=result['name_key'],
- mapping_file=MAPPING_FILE,
- conditions=result["conditions"],
- top_n=topN, # 查前3高
- descending=True
- )
- # keys = [result['name_key'], result['value_key']]
- final_content = fill_template_auto(result['content'], str(final_value))
- response = {
- "content": final_content,
- "content_text": result["content"],
- "raw_result": final_value,
- "conditions": result["conditions"],
- "name": result["name"],
- "play": result["play"],
- "unit": result["unit"],
- }
- # return jsonify(response)
- return jsonify(success_response(data=response))
- # TopN
- elif result['type'] == 'rank':
- rank = result["conditions"]['rank2']
- del result["conditions"]['rank2']
- # print(topN)
- final_value = find_top_n_by_value(folder_path=DATA_FOLDER,
- file_name=result["dataJsonName"],
- value_key=result['value_key'],
- name_key=result['name_key'],
- mapping_file=MAPPING_FILE,
- conditions=result["conditions"],
- top_n=rank, # 查前3高
- descending=True
- )
- # keys = [result['name_key'], result['value_key']]
- contentResult = final_value[rank - 1]
- # 构造需要填充的列表
- fillResult = [rank, contentResult[result['name_key']]]
- final_content = fill_template_auto(result['content'], fillResult)
- response = {
- "rank": rank,
- "content": final_content,
- "content_text": result["content"],
- "raw_result": final_value,
- "conditions": result["conditions"],
- "name": result["name"],
- "play": result["play"],
- "qcode": result["qcode"],
- "unit": result["unit"],
- }
- # return jsonify(response)
- return jsonify(success_response(data=response))
- except Exception as e:
- # return jsonify({"error": str(e)}), 500
- return jsonify(error_response(data=str(e)))
- if __name__ == "__main__":
- app.run(debug=True)
|