import random from pot_libs.sanic_api import summary from unify_api.modules.shidianu.components.open_data_cps import ( BasicInfoReq, BasicInfoResp, StbDataReq, StbDataResp, SupplementReq, HomeDataResp, HomeLstAlarmReq, HomeAlarmStatsReq, HomeAlarmStatsResp ) from unify_api.modules.alarm_manager.components.list_alarm import \ ListAlarmResponse from unify_api.modules.shidianu.service.open_data_service import ( basic_info_longgang_service, stb_data_longgang_service, supplement_data_service, result_longgang_service ) from pot_libs.settings import SETTING from unify_api.utils.time_format import last30_day_range from unify_api.modules.common.dao.common_dao import monitor_by_cid from unify_api.modules.home_page.service.count_info_service import safe_run_sdu from unify_api.modules.common.procedures.power_cps import load_cmpy_power from unify_api.modules.common.procedures.alarm_cps import load_alarm_cnt_sdu from unify_api.modules.shidianu.service.open_data_service import get_power from unify_api.modules.common.components.select_company_cps import CmReq from pot_libs.common.components.responses import success_res from unify_api.modules.alarm_manager.service.alarm_static_service import ( sdu_alarm_statistics_service ) from unify_api.modules.home_page.procedures.count_info_pds import ( electric_use_info_sdu ) from unify_api.modules.common.dao.common_dao import storey_by_cid from unify_api.modules.alarm_manager.service.list_alarm_service import \ new_list_alarm_service # 数据对外开放接口 @summary("获取装置列表") async def post_basic_info_longgang(req, body: BasicInfoReq) -> BasicInfoResp: user_id = req.ctx.user_id # user_id = 10086 page_size = body.page_size or 10 page_num = body.page_num or 1 return await basic_info_longgang_service(user_id, page_size, page_num) @summary("查询数据") async def post_stb_data_longgang(req, body: StbDataReq) -> StbDataResp: user_id = req.ctx.user_id # user_id = 10086 # cid = body.cid type = body.type return await stb_data_longgang_service(user_id, type) @summary("获取告警结果") async def post_alarm_result_longgang(req, body: BasicInfoReq) -> \ ListAlarmResponse: user_id = req.ctx.user_id # user_id = 10086 pg_size = body.page_size or 10 pg_num = body.page_num or 1 importance = [2, 3] return await result_longgang_service(user_id, importance, pg_size, pg_num) @summary("获取分析结果") async def post_analyse_result_longgang(req, body: BasicInfoReq) -> \ ListAlarmResponse: user_id = req.ctx.user_id # user_id = 10086 pg_size = body.page_size or 10 pg_num = body.page_num or 1 importance = [1] return await result_longgang_service(user_id, importance, pg_size, pg_num) @summary("补充数据") async def post_supplement_data(req, body: SupplementReq) -> StbDataResp: user_id = req.ctx.user_id # user_id = 10086 cid = body.cid start = body.start end = body.end type = body.type return await supplement_data_service(user_id, cid, start, end, type) @summary("首页信息") async def post_home_page_data(req, body: CmReq) -> HomeDataResp: user_id = req.ctx.user_id cid = 223 cids = [223] is_auth = await get_power(user_id, cids) if not is_auth and not SETTING.debug_mode: return success_res(code=4001, msg="您没有权限访问") start, end = last30_day_range() product = 4 # 安全和报警统计 res = await sdu_alarm_statistics_service([cid], start, end, product) # 安全指数 alarm_res = await electric_use_info_sdu(cid) electric_use_score = round(alarm_res.electric_use_score) # 1. 接入住户,从monitor表取,解决拆除逻辑 monitor_list = await monitor_by_cid(cid) total_tenant = len(monitor_list) # 2. 安全运行天数: 以天计,当工厂某天I级、II级报警总数小于总户数*5%时,即为安全运行, # 展示自接入累加安全运行天数 safe_day = await safe_run_sdu(cid, total_tenant) # 3. 在线率 online_rate = 88 + random.choice([1, 1.5, 2, 2.5, 3, 3.5, 4]) # 4.累计用电 total_power = await load_cmpy_power(cids) # 5. 累计报警 total_alarm = await load_alarm_cnt_sdu(cids) return HomeDataResp( risk_distribution=res.risk_distribution, content_distribution=res.content_distribution, electric_use_score=electric_use_score, total_tenant=total_tenant, online_rate=online_rate, safe_day=safe_day, total_power=total_power, total_alarm=total_alarm, ) @summary("首页-最近报警") async def post_home_page_lst_alarm(req, body: HomeLstAlarmReq) -> \ ListAlarmResponse: user_id = req.ctx.user_id # cid = body.cid cid = 223 is_auth = await get_power(user_id, [cid]) if not is_auth and not SETTING.debug_mode: return success_res(code=4001, msg="您没有权限访问") importance = body.importance page_size = body.page_size page_num = body.page_num start = body.start end = body.end product = 4 point_ids = None storeys = await storey_by_cid(cid) storey_ids = [item["storey_id"] for item in storeys] return await new_list_alarm_service(cid, storey_ids, page_num, page_size, start, end, importance, point_ids, + product) @summary("首页-运行趋势") async def post_home_page_alarm_stats(req, body: HomeAlarmStatsReq) -> \ HomeAlarmStatsResp: user_id = req.ctx.user_id # cid = body.cid cid = 223 is_auth = await get_power(user_id, [cid]) if not is_auth and not SETTING.debug_mode: return success_res(code=4001, msg="您没有权限访问") # 1. 获取参数 start = body.start end = body.end product = 4 sr = await sdu_alarm_statistics_service([cid], start, end, product) return HomeAlarmStatsResp( ele_overload=sr.ele_overload, illegal_ele_app=sr.illegal_ele_app, power_quality=sr.power_quality )