import random from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.sanic_api import summary from unify_api.constants import Product, PRODUCT from unify_api.modules.common.dao.common_dao import monitor_by_cid from unify_api.modules.common.procedures.alarm_cps import load_alarm_cnt_sdu from unify_api.modules.common.procedures.cids import get_cids, get_cid_info, \ get_proxy_cids from unify_api.modules.common.procedures.points import proxy_points from unify_api.modules.common.procedures.power_cps import load_cmpy_power from unify_api.modules.elec_charge.procedures.elec_charge_pds import \ load_proxy_power from unify_api.modules.home_page.components.count_info_proxy_cps import ( CountInfoProxyResp, ProxySecurityLevelCntResp, ProxySecurityLevelCntReq, ProxyAlarmPercentageCntResp, AlarmLevelCnt, AlarmContentCnt, ProxyIndexMapResp, ProxyInfoItem, CidInfoItem, CountInfoSduResp, AlarmDistributionResp, AlarmItem, AlarmRankingResp, RegAlarmCnt, AlarmTimeDistribution, AlarmRankingReq, AipResp, CisResp, CisReq, ) from unify_api.modules.home_page.procedures.count_info_pds import other_info from unify_api.modules.home_page.procedures.count_info_proxy_pds import ( security_level_count, alarm_percentage_count, proxy_map_info, reg_map_info, total_run_day_proxy, ) from unify_api.modules.home_page.procedures.security_info_pds import \ alarm_count_info from unify_api.modules.home_page.service.count_info_service import \ safe_run_sdu from unify_api.modules.elec_charge.components.elec_charge_cps import \ ProductProxyReq from unify_api.modules.users.procedures.jwt_user import jwt_user @summary("代理版首页统计信息-安电U") async def post_count_info_proxy(req) -> CountInfoProxyResp: # 1. 获取cid_list host = req.host product = PRODUCT.get(host) user_id = jwt_user(req) proxy_id = req.json.get("proxy_id") # cid_list = await get_cids(user_id, product) cid_list = await get_proxy_cids(user_id, product, proxy_id) if not cid_list: return CountInfoProxyResp( total_cid=0, total_monitor=0, safe_operation_days=0, total_power=0, total_alarm=0, ) total_cid = len(cid_list) # 2. 监测点位 total_monitor = await proxy_points(cid_list) # 3. 安全运行天数, 累计报警次数 safe_operation_days = 0 total_alarm = 0 for cid in cid_list: today_alarm_count, safe_run_days, alarm_count = await other_info(cid) safe_operation_days += safe_run_days total_alarm += alarm_count # 4. 累计监测用电 total_power = await load_proxy_power(cid_list) return CountInfoProxyResp( total_cid=total_cid, total_monitor=total_monitor, safe_operation_days=safe_operation_days, total_power=total_power, total_alarm=total_alarm, ) @summary("代理版首页安全级别占比统计") async def post_security_level_count( request, body: ProxySecurityLevelCntReq ) -> ProxySecurityLevelCntResp: user_id = request.ctx.user_id product = Product.AndianUManage.value proxy_id = body.proxy_id # cids = await get_cids(user_id, product) cids = await get_proxy_cids(user_id, product, proxy_id) security_level_map = await security_level_count(cids) return ProxySecurityLevelCntResp(**security_level_map) @summary("代理版首页报警分布 占比统计") async def post_alarm_percentage_count( request, body: ProxySecurityLevelCntReq ) -> ProxyAlarmPercentageCntResp: user_id = jwt_user(request) product = body.product req_cid = body.cid if not req_cid: proxy_id = body.proxy_id # cids = await get_cids(user_id, product) cids = await get_proxy_cids(user_id, product, proxy_id) else: cids = [req_cid] alarm_percentage_map = await alarm_percentage_count(cids) return ProxyAlarmPercentageCntResp( alarm_level_cnt=AlarmLevelCnt( first_alarm_cnt=alarm_percentage_map["first_alarm_cnt"], second_alarm_cnt=alarm_percentage_map["second_alarm_cnt"], third_alarm_cnt=alarm_percentage_map["third_alarm_cnt"], ), alarm_content_cnt=AlarmContentCnt( temperature_cnt=alarm_percentage_map["temperature_cnt"], residual_current_cnt=alarm_percentage_map["residual_current_cnt"], electric_param_cnt=alarm_percentage_map["electric_param_cnt"], ), alarm_time_cnt=AlarmTimeDistribution( day_alarm_cnt=alarm_percentage_map["day_alarm_cnt"], night_alarm_cnt=alarm_percentage_map["night_alarm_cnt"], morning_alarm_cnt=alarm_percentage_map["morning_alarm_cnt"], ), ) @summary("代理版本首页地图数据") async def post_proxy_map_info(request, body: ProxySecurityLevelCntReq) -> ProxyIndexMapResp: user_id = jwt_user(request) product = body.product req_cid = body.cid if not req_cid: # cids = await get_cids(user_id, product) proxy_id = body.proxy_id cids = await get_proxy_cids(user_id, product, proxy_id) else: cids = [req_cid] if not cids: return ProxyIndexMapResp( proxy_info=[ProxyInfoItem(district_code="", cid_info_list=[], longitude="", latitude="")] ) if product != Product.IntelligentU.value: async with MysqlUtil() as conn: auth_prod_sql = "select proxy from user_product_auth where user_id=%s and product=%s" auth_prod = await conn.fetchone(auth_prod_sql, args=(user_id, product)) if auth_prod and auth_prod["proxy"] == 0: proxy_id = 0 else: # 注意安电管理版本和识电U公司必须给配代理 company_sql = "select cpm.proxy from company c " \ "left join company_proxy_map cpm " \ "on cpm.cid=c.cid where c.cid = %s" company = await conn.fetchone(company_sql, args=(cids[0],)) proxy_id = company["proxy"] district_sql = "select adcode, longitude, latitude from district where proxy_id=%s" district = await conn.fetchone(district_sql, args=(proxy_id,)) else: district = {} if product == Product.RecognitionElectric.value: # 识电U proxy_info = await reg_map_info(cids=cids) else: # 安电U管理版本 proxy_info = await proxy_map_info(cids) return ProxyIndexMapResp( proxy_info=[ ProxyInfoItem( district_code=district["adcode"] if district else "", cid_info_list=[CidInfoItem(**cid_info) for cid_info in proxy_info.values()], longitude=district["longitude"] if district else "", latitude=district["latitude"] if district else "", ) ] ) @summary("识电U首页报警分布") async def post_reg_alarm_distribution(request, body: AlarmRankingReq) -> AlarmDistributionResp: product = body.product start = body.start end = body.end date_type = body.date_type type_alarm_cnt_map = {} if product == Product.RecognitionElectric.value: user_id = request.ctx.user_id cids = await get_cids(user_id, product) alarm_info_map = await alarm_count_info(cids, start, end, date_type) type_alarm_cnt_map = alarm_info_map["type_alarm_cnt_map"] return AlarmDistributionResp( alarm_categories=RegAlarmCnt( illegal_eleprod=type_alarm_cnt_map.get("illegal_ele_app", 0), high_p_eleprod=type_alarm_cnt_map.get("high_power_app", 0), overuse_eleprod=type_alarm_cnt_map.get("ele_overload", 0), electric_quantity=type_alarm_cnt_map.get("power_quality_low", 0), ele_car_battery=type_alarm_cnt_map.get("ele_car_battery", 0), ) ) @summary("识电U首页报警排名") async def post_reg_alarm_rank(request, body: AlarmRankingReq) -> AlarmRankingResp: product = body.product start = body.start end = body.end date_type = body.date_type cid_alarm_cnt_map = {} if product == Product.RecognitionElectric.value: user_id = request.ctx.user_id cids = await get_cids(user_id, product) alarm_info_map = await alarm_count_info(cids, start, end, date_type) cid_alarm_cnt_map = alarm_info_map["cid_alarm_cnt_map"] cid_info_map = await get_cid_info(all=True) position_alarm_info = [ AlarmItem(name=cid_info_map[cid]["shortname"], alarm_cnt=cid_alarm_cnt_map[cid]) for cid in cid_alarm_cnt_map ] position_alarm_info.sort(key=lambda x: x.alarm_cnt, reverse=True) return AlarmRankingResp(position_alarm_info=position_alarm_info) @summary("代理版首页统计信息-知电U") async def post_zhidian_info_proxy(req, body: ProductProxyReq) -> AipResp: # 1. 获取cid_list host = req.host product = PRODUCT.get(host) user_id = req.ctx.user_id proxy_id = body.proxy_id cid_list = await get_proxy_cids(user_id, product, proxy_id) \ if proxy_id else () # cid_list = await get_cids(user_id, product) total_cid = len(cid_list) # 2. 监测点位 total_monitor = await proxy_points(cid_list) # 3. 累计监测用电 total_power = await load_proxy_power(cid_list) # 4. 用户接入总时长, 每个工厂接入时长总和 total_run_day = await total_run_day_proxy(cid_list) return AipResp( total_cid=total_cid, total_monitor=total_monitor, total_power=total_power, total_run_day=total_run_day, ) @summary("首页统计信息-正式版sdu") async def post_count_info_sdu_new(req, body: CisReq) -> CisResp: cid = body.cid cids = [cid] # 1. 接入住户,从monitor表取,解决拆除逻辑 monitor_list = await monitor_by_cid(cid) total_tenant = len(monitor_list) # 2. 安全运行天数: 以天计,当工厂某天I级、II级报警总数小于总户数*5%时,即为安全运行, # 展示自接入累加安全运行天数 safe_day = await safe_run_sdu(cid, total_tenant) # 3. 在线率 online_rate = 88 + random.choice([1, 1.5, 2, 2.5, 3, 3.5, 4]) # 4.累计用电 total_power = await load_cmpy_power(cids) # 5. 累计报警 total_alarm = await load_alarm_cnt_sdu(cids) return CisResp( total_tenant=total_tenant, online_rate=online_rate, safe_day=safe_day, total_power=total_power, total_alarm=total_alarm, )