from pot_libs.sanic_api import summary from unify_api.modules.alarm_manager.components.alarm_static_cps import \ SduAlarmReq, SduAlarmResp, ContentName, RiskCount, SasReq, SassReq, \ SassResp, AppReq, AppResp, SisReq, SisResp, SiasReq, SiasResp, SebReq, \ SebResp, SiarResp, ZsResp, ZasReq, ZasResp from unify_api.modules.alarm_manager.service.alarm_static_service import \ sdu_alarm_statistics_service, sdu_electric_behave_service, \ zdu_level_distribution_service, \ zdu_content_distribution_service, zdu_summary_service, \ zdu_alarm_sort_service_2, sdu_alarm_statistics_sort_service, \ sdu_app_statistics_sort_service, sdu_index_alarm_rank from unify_api.modules.home_page.components.security_info_cps import \ SecurityCountResp, AlarmContentDistributionResp from unify_api.modules.home_page.procedures.count_info_pds import \ electric_use_info_sdu from unify_api.utils.time_format import last30_day_range @summary("报警统计-识电u") async def post_sdu_alarm_statistics(req, body: SduAlarmReq) -> SduAlarmResp: """目前用于 识电u->报警统计""" product = body.product cid = body.cid start = body.start end = body.end return await sdu_alarm_statistics_service([cid], start, end, product) @summary("报警统计-wx-识电u") async def post_sdu_alarm_statistics_wx(req, body: SasReq) -> SduAlarmResp: """目前用于 识电u->报警统计""" product = body.product cids = body.cids start = body.start end = body.end return await sdu_alarm_statistics_service(cids, start, end, product) @summary("报警统计-报警记录-排名-识电u") async def post_sdu_alarm_statistics_sort(req, body: SassReq) -> SassResp: """目前用于 识电u->报警统计""" product = body.product cid = body.cid start = body.start end = body.end page_size = body.page_size page_num = body.page_num sort = body.sort return await sdu_alarm_statistics_sort_service(cid, start, end, page_size, page_num, sort) @summary("报警统计-电器识别-排名-识电u") async def post_sdu_app_statistics_sort(req, body: AppReq) -> AppResp: """目前用于 识电u->报警统计""" product = body.product cid = body.cid start = body.start end = body.end return await sdu_app_statistics_sort_service(cid, start, end) @summary("首页-运行趋势-识电u") async def post_sdu_index_statistics(req, body: SisReq) -> SisResp: product = body.product cid = body.cid start = body.start end = body.end sr = await sdu_alarm_statistics_service([cid], start, end, product) return SisResp( ele_overload=sr.ele_overload, illegal_ele_app=sr.illegal_ele_app, power_quality=sr.power_quality ) @summary("首页-报警统计-新版识电u") async def post_sdu_index_alarm_statistics(req, body: SiasReq) -> SiasResp: # 最近30天, 不包含今天 start, end = last30_day_range() product = body.product cid = body.cid # 安全和报警统计 res = await sdu_alarm_statistics_service([cid], start, end, product) # 安全指数 alarm_res = await electric_use_info_sdu(cid) electric_use_score = round(alarm_res.electric_use_score) return SiasResp( risk_distribution=res.risk_distribution, content_distribution=res.content_distribution, electric_use_score=electric_use_score ) @summary("用电行为-卡片数据-新版识电u") async def post_sdu_electric_behave(req, body: SebReq) -> SebResp: """近30天用电行为""" cid = body.cid storeys = body.storeys product = body.product # 最近30天, 不包含今天 start, end = last30_day_range() return await sdu_electric_behave_service(cid, start, end, storeys, product) @summary("首页-报警违规排名-新版识电u") async def post_sdu_index_alarm_ranking(req, body: SiasReq) -> SiarResp: """近30天用电行为""" cid = body.cid product = body.product # 最近30天, 不包含今天 start, end = last30_day_range() return await sdu_index_alarm_rank(cid, start, end, product) @summary("报警统计-报警等级-智电u") async def post_zdu_level_distribution(req, body: SduAlarmReq) -> SecurityCountResp: product = body.product cid = body.cid start = body.start end = body.end return await zdu_level_distribution_service(cid, start, end, product) @summary("报警统计-报警内容-智电u") async def post_zdu_content_distribution(req, body: SduAlarmReq) \ -> AlarmContentDistributionResp: product = body.product cid = body.cid start = body.start end = body.end return await zdu_content_distribution_service(cid, start, end, product) @summary("报警统计-统计概况信息-智电u") async def post_zdu_summary(req, body: SduAlarmReq) -> ZsResp: product = body.product cid = body.cid start = body.start end = body.end return await zdu_summary_service(cid, start, end, product) @summary("报警统计-报警排名-智电u") async def post_zdu_alarm_sort(req, body: ZasReq) -> ZasResp: product = body.product cid = body.cid start = body.start end = body.end page_size = body.page_size page_num = body.page_num return await zdu_alarm_sort_service_2(cid, start, end, page_size, page_num)