import ast from unify_api.constants import SDU_ONE_TWO_GRADE_ALARM from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api.constants import CO2_N from unify_api.modules.carbon_neutral.service.carbon_reduce_service import \ carbon_emission_index_service from unify_api.modules.common.dao.common_dao import monitor_by_cid, tsp_by_cid, \ water_by_cid, storey_wp_by_cid, company_extend_by_cid from unify_api.modules.common.procedures import health_score from unify_api.modules.common.procedures.points import proxy_points, list_point from unify_api.modules.elec_charge.dao.elec_charge_dao import \ query_charge_aggs_points, point_kwh_charge from unify_api.modules.elec_charge.procedures.elec_charge_pds import \ load_proxy_power from unify_api.modules.home_page.components.count_info_proxy_cps import \ IycResp, IycmResp, RtrResp, AlarmLevelCnt, AlarmContentCnt, CmResp, \ ApcResp, AsiResp, HsiResp, AiiResp from unify_api.modules.home_page.dao.count_info_dao import \ alarm_aggs_point_location from unify_api.modules.home_page.procedures.count_info_pds import other_info, \ electric_use_info, cid_alarm_importance_count, \ alarm_importance_count_total, \ get_company_charge_price, health_status_res, carbon_status_res_web, \ optimization_count_info, economic_index_desc, \ cal_power_factor, real_time_load from unify_api.modules.home_page.procedures.count_info_proxy_pds import \ alarm_percentage_count, alarm_safe_power from unify_api.modules.tsp_water.dao.drop_dust_dao import \ dust_water_run_day_sum_water from unify_api.modules.tsp_water.dao.tsp_dao import tsp_histogram_day_tsp_id, \ range_max_value from unify_api.utils.common_utils import round_2 from unify_api.utils.time_format import last30_day_range, start_end_date, \ last_15min_range, yesterday_range, last7_day_range, last_month_start_end from datetime import datetime from unify_api.modules.home_page.procedures.count_info_pds import \ safety_ratio_res async def post_zd_info_factory_service(cid_list): """监测点,安全运行,累计用电,累计报警""" # 1. 监测点位 total_monitor = await proxy_points(cid_list) # 2. 安全运行天数, 累计报警次数 safe_operation_days = 0 total_alarm = 0 for cid in cid_list: today_alarm_count, safe_run_days, alarm_count = await other_info(cid) safe_operation_days += safe_run_days total_alarm += alarm_count # 4. 累计监测用电 total_power = await load_proxy_power(cid_list) # 5. 日均碳排放 = 累计用电 * 0.754 / 总运行天数 # 运行天数 sql = "select create_time from company where cid = %s" async with MysqlUtil() as conn: result = await conn.fetchone(sql, (cid_list[0],)) now_time = datetime.now() create_time = result["create_time"] create_time = datetime.fromtimestamp(create_time) run_days = (now_time - create_time).days + 1 # 日均碳排放 co2 = round_2(total_power * CO2_N / run_days) return total_monitor, safe_operation_days, total_power, total_alarm, co2 async def risk_cost_service(cid): """智电U首页, 运行统计-风险成本""" # 1. 近30天 start, end = last30_day_range() # 2. 求工厂所有监测点 p_res = await list_point(cid) point_list = p_res.get("points") risk_list = [] cost_list = [] # 3. 查询es获取到point和location报警数 location_res, point_res = await alarm_aggs_point_location(start, end, cid) location_dic = {i["key"]: i["doc_count"] for i in location_res if location_res} point_dic = {i["key"]: i["doc_count"] for i in point_res if point_res} # 4. 求平均电价 points = [i.get("point_id") for i in point_list] es_res = await query_charge_aggs_points(start, end, points) es_res = {i["key"]: i for i in es_res if es_res} # 5. 构造返回 for info in point_list: # 5.1 risk point_id = info.get("point_id") locations = info.get("locations") name = info.get("name") # 初始化返回dic risk_dic = { "name": name, "value": 0 } # 计算point次数 if point_id in point_dic: risk_dic["value"] += point_dic[point_id] # 计算locations次数 if locations: for lo in locations: if lo["location_id"] in location_dic: risk_dic["value"] += location_dic[lo["location_id"]] risk_list.append(risk_dic) # 5.2 cost cost_dic = { "name": name, "value": 0 } if point_id in es_res: charge = es_res[point_id]["charge"]["value"] kwh = es_res[point_id]["kwh"]["value"] try: price = round_2(charge / kwh) except: price = 0 cost_dic["value"] = price cost_list.append(cost_dic) risk_list_st = sorted(risk_list, key=lambda i: i['value'], reverse=True)[ :10] cost_list_st = sorted(cost_list, key=lambda i: i['value'], reverse=True)[ :10] return risk_list_st, cost_list_st async def safe_run_sdu(cid, total_tenant): # 1. 每一个point总安装天数 # 安装时间create_time company_sql = "select create_time from company where cid = %s" sql = f"select DATE_FORMAT(event_datetime, '%Y-%m-%d') dt, " \ f"count(1) doc_count " \ f"from point_1min_event where cid={cid} and " \ f"event_type in {tuple(SDU_ONE_TWO_GRADE_ALARM)} " \ f"GROUP BY dt" async with MysqlUtil() as conn: company = await conn.fetchone(company_sql, (cid,)) res_datas = await conn.fetchall(sql) create_time_timestamp = company["create_time"] create_time = datetime.fromtimestamp(create_time_timestamp) now_time = datetime.now() # +2表示包过起始日期 total_days = (now_time - create_time).days + 2 for res in res_datas: if res.get("doc_count") > total_tenant * 0.05: total_days -= 1 return total_days async def info_yang_chen_service(cid): """工厂版首页统计信息-扬尘""" # 1. 监测对象, TSP、喷淋、用电监测点数量的加和 monitor_list = await monitor_by_cid(cid) tsp_list = await tsp_by_cid(cid) water_list = await water_by_cid(cid) total_point = len(monitor_list) + len(tsp_list) + len(water_list) # 2. 空气优, 为空气质量优的累计天数,从接入平台算起,对每个TSP监测点计算PM2.5 # 一天的平均值,这些平均值中的最大值小于35时则加一天 air_quality = 0 air_res = await tsp_histogram_day_tsp_id(interval="day") for bucket in air_res: inner_bucket = bucket["tsps"]["buckets"] pm25_max_list = [] for inner in inner_bucket: pm25 = inner["pm25"]["value"] if pm25: pm25_max_list.append(pm25) if pm25_max_list and max(pm25_max_list) < 35: air_quality += 1 # 3. 安全运行天数, 从接入平台算起,未出现一级报警则加一天 # alarm_es = await sdu_alarm_aggs_date_importance(cid) alarm_es = [] safe_operation_days = 0 for alarm in alarm_es: in_bucket = alarm["importance"]["buckets"] inner_dic = {info["key"]: info for info in in_bucket} if 1 not in inner_dic: safe_operation_days += 1 # 4. 累计降尘用水 sum_water = await dust_water_run_day_sum_water() total_water = sum_water.get("sum(water_total)") # 5. 累计降尘用电为, 全部雾炮监测点从接入平台算起,每一天电量的加和 # 获取全部雾炮监测点 storey_list = await storey_wp_by_cid(cid) point_list = [storey["point_id"] for storey in storey_list] kwh_res = await point_kwh_charge(point_list) total_kwh = kwh_res.get("kwh") or 0 return IycResp( total_point=total_point, air_quality=air_quality, safe_operation_days=safe_operation_days, total_water=round_2(total_water), total_kwh=round_2(total_kwh) ) async def info_yang_chen_map_service(cid): """工厂版首页地图信息-扬尘""" # 1. 今日报警统计 today_alarm_count, safe_run_days, alarm_count = await other_info(cid) # 2. 安全指数 safe_index = await electric_use_info(cid) # 3. 今日最高PM2.5 today_start, today_end, month_start, month_end = start_end_date() es_pm25 = await range_max_value(today_start, today_end) # 4. 地图坐标 company_list = await company_extend_by_cid(cid) center_address = [] range_address = [] for company in company_list: if company["key"] == "map_center_yangchen": center_address = ast.literal_eval(company["value"]) if company["key"] == "map_range_yangchen": range_address = ast.literal_eval(company["value"]) return IycmResp( safety_index=safe_index.electric_use_score, today_alarm=today_alarm_count, total_max_pm25=round_2(es_pm25["aggregations"]["pm25"]["value"]), center_address=center_address, range_address=range_address ) async def rank_type_ranking_service(cid): """首页报警统计-等级类型排名-智电U""" alarm_percentage_map = await alarm_percentage_count([cid]) start, end = last30_day_range() res_list_sort = await cid_alarm_importance_count(cid, start, end) return RtrResp( alarm_level_cnt=AlarmLevelCnt( first_alarm_cnt=alarm_percentage_map["first_alarm_cnt"], second_alarm_cnt=alarm_percentage_map["second_alarm_cnt"], third_alarm_cnt=alarm_percentage_map["third_alarm_cnt"], ), alarm_content_cnt=AlarmContentCnt( temperature_cnt=alarm_percentage_map["temperature_cnt"], residual_current_cnt=alarm_percentage_map["residual_current_cnt"], electric_param_cnt=alarm_percentage_map["electric_param_cnt"], ), ranking=res_list_sort ) async def condition_monitor_service(cid): """首页状态监测-智电U""" # 1. 获取最新5分组 start, end = last_15min_range() # 2. 分别获取报警类型, 报警次数 alarm_map = await alarm_safe_power(cid, start, end) return CmResp( safe={ "temperature_cnt": alarm_map["temperature_cnt"], "residual_current_cnt": alarm_map["residual_current_cnt"] }, power={ "lr_cnt": alarm_map["lr_cnt"], "power_factor_cnt": alarm_map["power_factor_cnt"], "under_u_cnt": alarm_map["under_u_cnt"], "over_u_cnt": alarm_map["over_u_cnt"], "over_i_cnt": alarm_map["over_i_cnt"] } ) async def alarm_price_costtl_service(cid): """首页-今日报警平均电价等-智电U""" today_start, today_end, month_start, month_end = start_end_date() # 1. 今日报警 imp_dic = await alarm_importance_count_total(cid, today_start, today_end) # 2. 实时功率因数, 上月功率因数 cos_ttl, last_month_cos = await cal_power_factor(cid) # 3. 实时负荷 cur_load = await real_time_load(cid) # 4. 平均电价 # 昨天 yesterday_start, yesterday_end = yesterday_range() # 转换为es格式 yesterday_start_es = datetime.strptime( yesterday_start, "%Y-%m-%d %H:%M:%S"). \ strftime("%Y-%m-%dT%H:%M:%S+08:00") yesterday_end_es = datetime.strptime( yesterday_end, "%Y-%m-%d %H:%M:%S"). \ strftime("%Y-%m-%dT%H:%M:%S+08:00") yesterday_price = await get_company_charge_price( cid, yesterday_start_es, yesterday_end_es ) # 近7天 last7_start, last7_end = last7_day_range() # 转换为es格式 last7_start_es = datetime.strptime( last7_start, "%Y-%m-%d %H:%M:%S"). \ strftime("%Y-%m-%dT%H:%M:%S+08:00") last7_end_es = datetime.strptime( last7_end, "%Y-%m-%d %H:%M:%S"). \ strftime("%Y-%m-%dT%H:%M:%S+08:00") last7_price = await get_company_charge_price( cid, last7_start_es, last7_end_es ) # 近30天 last30_start, last30_end = last30_day_range() last30_start_es = datetime.strptime( last30_start, "%Y-%m-%d %H:%M:%S"). \ strftime("%Y-%m-%dT%H:%M:%S+08:00") last30_end_es = datetime.strptime( last30_end, "%Y-%m-%d %H:%M:%S"). \ strftime("%Y-%m-%dT%H:%M:%S+08:00") last30_price = await get_company_charge_price( cid, last30_start_es, last30_end_es ) # 5. 今日平均电价 today_start, today_end, month_start, month_end = start_end_date() # 转换为es格式 today_start_es = datetime.strptime( today_start, "%Y-%m-%d %H:%M:%S"). \ strftime("%Y-%m-%dT%H:%M:%S+08:00") today_end_es = datetime.strptime( today_end, "%Y-%m-%d %H:%M:%S"). \ strftime("%Y-%m-%dT%H:%M:%S+08:00") today_price = await get_company_charge_price( cid, today_start_es, today_end_es ) return ApcResp( today_alarm={ "first_cnt": imp_dic["first_cnt"], "second_cnt": imp_dic["second_cnt"], "third_cnt": imp_dic["third_cnt"], "total_cnt": imp_dic["total_cnt"] }, avg_price={ "yesterday": round_2(yesterday_price), "last_7_day": round_2(last7_price), "last_30_day": round_2(last30_price) }, real_costtl=cos_ttl, real_total_p=cur_load, today_avg_price=round_2(today_price) ) async def alarm_safe_index_service(cid): """首页-用电安全指数等""" elec_info = await electric_use_info(cid) # 安全风险系数 safety_ratio = safety_ratio_res(elec_info.electric_use_score, "wechat") return AsiResp( first_alarm_cnt=elec_info.first_alarm_cnt, second_alarm_cnt=elec_info.second_alarm_cnt, third_alarm_cnt=elec_info.third_alarm_cnt, alarm_score=round_2(elec_info.alarm_score), electric_use_score=elec_info.electric_use_score, # 安全指数 safety_ratio=safety_ratio ) async def economic_index_res(cid): """经济指数""" count_info_map = await optimization_count_info(cid) save_percent = count_info_map["save_percent"] if save_percent == "": economic_power_index = 100 elif 0 <= save_percent < 0.005: economic_power_index = -2000 * save_percent + 100 elif 0.005 <= save_percent < 0.02: economic_power_index = -1000 * save_percent + 95 elif 0.02 <= save_percent < 0.05: economic_power_index = -500 * save_percent + 85 elif 0.05 <= save_percent < 0.1: economic_power_index = -1200 * save_percent + 120 else: economic_power_index = 0 return round(economic_power_index) async def all_index_info_service(cid): """首页-全部指数""" # 1. 健康指数 health_index = await health_score.load_health_index(cid) health_index = round(health_index) health_status = health_status_res(health_index, "web") # 2. 安全指数 elec_info = await electric_use_info(cid) safety_index = elec_info.electric_use_score safety_status = safety_ratio_res(safety_index, "web") # 3. 碳排指数 # 上个月起止时间 start, end = last_month_start_end() carbon_info = await carbon_emission_index_service(cid, start, end) carbon_index = carbon_info.carbon_index carbon_status = carbon_status_res_web(carbon_index) # 4. 经济指数 economic_index = await economic_index_res(cid) economic_status = economic_index_desc(economic_index, "web") return AiiResp( health_index={"value": health_index, "desc": health_status}, safe_index={"value": safety_index, "desc": safety_status}, carbon_index={"value": carbon_index, "desc": carbon_status}, economic_index={"value": economic_index, "desc": economic_status} )