from pot_libs.sanic_api import summary from unify_api.modules.tsp_water.script.stateweath import crawlpm, POSISTATE from unify_api.modules.tsp_water.components.tsp_map_cps import \ IndexMapDateResp, IndexDataResp, IndexMapallDateResp, IndexTopDataResp from unify_api.modules.tsp_water.script.stateweath import downscal from unify_api.modules.tsp_water.service.tsp_map_service import \ tsp_map_service, index_top_data_service from pot_libs.common.components.responses import success_res from unify_api.utils.response_code import RET @summary("扬尘生态环境管理平台-首页地图国控站数据") async def get_index_map_data(req) -> IndexMapDateResp: try: pm_data = crawlpm() except Exception as e: return success_res(code=RET.not_data, msg="获取国控站数据失败") data = [] for key, value in POSISTATE.items(): try: pm25 = pm_data.get(key)[1] pm10 = pm_data.get(key)[2] except: pm25, pm10 = None, None data_dict = { "name": f"{key}国控站", "position": value, "pm25": pm25, "pm10": pm10 } data.append(data_dict) return IndexMapDateResp(data) @summary("扬尘生态环境管理平台-首页地图底层加载渐变数据") async def get_index_map_all_data(req) -> IndexMapallDateResp: try: res = downscal() except Exception as e: return success_res(code=RET.not_data, msg="获取国控站数据失败") map_data = [{"position": [each[0], each[1]], "pm25": round(each[2])} for each in res.values] return IndexMapallDateResp(map_data) @summary("扬尘生态环境管理平台-首页公司tsp数据") async def get_index_data(req) -> IndexDataResp: # user_id = 10086 user_id = req.ctx.user_id return await tsp_map_service(user_id) @summary("扬尘生态环境管理平台-首页最顶部统计数据") async def get_index_top_data(req) -> IndexTopDataResp: # user_id = 10086 user_id = req.ctx.user_id return await index_top_data_service(user_id)