import asyncio import json import aiohttp from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils from pot_libs.common.components.responses import success_res from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.settings import SETTING from unify_api.modules.common.dao.common_dao import meter_param_by_mid, \ point_by_pid_mtid, monitor_by_mtid from unify_api.modules.electric.procedures.electric_util import get_wiring_type from unify_api.modules.zhiwei_u import config from unify_api.modules.zhiwei_u.dao.data_operations_dao import \ select_point_by_mtid, location_by_mtid from unify_api.modules.zhiwei_u.dao.warning_operations_dao import get_username from unify_api.modules.zhiwei_u.dao.equipment_operations_dao import \ equipment_operations_dao, total_equipment_operations_dao, \ monitor_by_cid_sid from unify_api.modules.zhiwei_u.components.equipment_operations_cps import \ EquOptResp, EdResp, EdmResp from unify_api.modules.zhiwei_u.procedures.dev_demolish import dev_demolish from unify_api.modules.zhiwei_u.procedures.dev_replace import dev_replace from unify_api.modules.zhiwei_u.procedures.equipment_operations_pds import \ alarm_setting_pds, device_get_v1, device_get_config from unify_api.utils.response_code import RET async def equipment_operations_service(userid, prod_id, cid, sid, pid, page_num, page_size): """设备管理""" current_page = (page_num - 1) * page_size if sid and not pid: sql = f"SELECT c.product,c.cid,c.shortname,p.pid,p.name,m.sid,p.mtid" \ f" FROM `monitor` m LEFT JOIN company c on m.cid=c.cid " \ f"left join point p on p.mtid=m.mtid where demolished = 0 and " \ f"sid='{sid}' " \ f"limit {current_page}, {page_size}" t_sql = f"SELECT count(c.cid) total FROM `monitor` m " \ f"LEFT JOIN company c " \ f"on m.cid=c.cid left join point p on p.mtid=m.mtid " \ f"where demolished = 0 and sid='{sid}'" else: conn_sql = "where demolished = 0 " if prod_id: conn_sql += f" and c.product={prod_id}" if cid: conn_sql += f" and c.cid={cid}" if pid: conn_sql += f" and p.pid={pid}" sql = f"SELECT c.product,c.cid,c.shortname,p.pid,p.name,m.sid,p.mtid" \ f" FROM `monitor` m LEFT JOIN company c on m.cid=c.cid " \ f"left join point p on p.mtid=m.mtid {conn_sql} " \ f"limit {current_page}, {page_size}" t_sql = f"SELECT count(c.cid) total FROM `monitor` m " \ f"LEFT JOIN company c " \ f"on m.cid=c.cid left join point p on p.mtid=m.mtid " \ f"{conn_sql}" user = await get_username(userid) is_mod = 1 if user["role"] == 2 else 0 datas = await equipment_operations_dao(sql) total = await total_equipment_operations_dao(t_sql) for data in datas: data["productname"] = config.PRODUCT[data["product"]] return EquOptResp(total=total["total"], is_mod=is_mod, rows=datas) async def equipment_details_mqtt_service(sid, mtid): """设备管理-详情和修改""" software_version, scope_status = "", "" upload_freq, harm_count = "", "" # 根据mtid查询meter_no monitor_dic = await monitor_by_mtid(mtid) meter_no = monitor_dic["meter_no"] # 1. 求设备当前状态22 status_ = "离线" resp_str, status = await AioHttpUtils().get( url=f"http://{SETTING.device_status_host}:{SETTING.device_status_port}" f"/api/v4/clients/eems%2Ftd%2F{sid}", auth=aiohttp.BasicAuth(login=f"{SETTING.device_status_app_id}", password=f"{SETTING.device_status_app_secret}") ) res_http = json.loads(resp_str) if status == 200 and res_http["data"] and res_http["data"][0]["connected"]: status_ = "在线" # 4. 上传频率,谐波次数,录波当前状态,代码版本 # 4.1先根据1.0协议查询数据 if status_ == "在线": res_v1 = await device_get_v1(sid) log.info(f"res_v1:{res_v1}") if res_v1 and res_v1["status_code"] == 200: res_data = res_v1.get("data") software_version = res_data.get("software").get("software_version") scope_status = "开启" if res_data.get("scope").get("electric").get( meter_no).get("threshold").get("scopeEnable") == 1 else "关闭" # 上传频率, 条/分钟 upload_freq = 1 # 谐波次数 harm_count = 17 else: # 4.2根据2.0协议查询数据 res_elec = await device_get_config(sid, key="electric") log.info(f"res_elec:{res_elec}") if res_elec and res_elec["status_code"] == 200: elec_data = res_elec.get("data") upload_freq = elec_data.get("electric").get(meter_no).get( "upload_freq") harm_count = elec_data.get("electric").get(meter_no).get( "harm_count") await asyncio.sleep(2) # 装置单通道, 连着发会失败 res_scope = await device_get_config(sid, key="scope") log.info(f"res_scope:{res_scope}") if res_scope and res_scope["status_code"] == 200: scope_data = res_scope.get("data") scope_status = "开启" if scope_data.get("scope").get( sid).get("threshold").get( "scopeEnable") == 1 else "关闭" software_version = res_scope.get("software_version") return EdResp( software_version=software_version, device_status=status_, scope_status=scope_status, upload_freq=upload_freq, harm_count=harm_count ) async def equipment_details_mysql_service(sid, mtid): """设备管理-详情和修改-从mysql获取数据""" # 1. 报警 p_dic = await select_point_by_mtid(mtid) point_id = p_dic["pid"] location_info_list = await location_by_mtid(mtid) locations = [i["lid"] for i in location_info_list] # 从alarm_setting表获取数据 alarm_list = await alarm_setting_pds(point_id, locations) # 2. 额定电压, 额定电流 ct, pt, rated_voltage, rated_current = "", "", "", "" ctnum, mtid = await get_wiring_type(point_id) # 装置已安装逻辑:有mid且ptr、ctr有值 if mtid: meter_param_dic = await meter_param_by_mid(mtid) if meter_param_dic: ct = meter_param_dic.get("ctr") ct = ct if ct is not None else "" pt = meter_param_dic.get("ptr") pt = pt if pt is not None else "" rated_voltage = meter_param_dic.get("vc") rated_voltage = rated_voltage if rated_voltage is not None else "" rated_current = meter_param_dic.get("imax") rated_current = rated_current if rated_current is not None else "" return EdmResp( ct=ct, pt=pt, rated_voltage=rated_voltage, rated_current=rated_current, alarm=alarm_list ) async def equipment_change_name_service(mtid, pid, new_name): """设备管理-修改-安装点更名""" # 1. 确认mtid和pid对应关系 point_dic = await point_by_pid_mtid(mtid, pid) if not point_dic: return success_res(code=RET.op_fail, msg="安装点不存在,或者mtid与pid不匹配") # 2. 更改安装点名称 sql_point = "UPDATE point SET name=%s WHERE pid=%s and mtid=%s" sql_monitor = "UPDATE monitor SET name=%s WHERE mtid=%s" async with MysqlUtil() as conn: res_p = await conn.execute(sql_point, args=(new_name, pid, mtid)) res_m = await conn.execute(sql_monitor, args=(new_name, mtid)) if res_p == 1 and res_m == 1: return success_res(msg="操作成功") else: return success_res(code=RET.op_fail, msg="数据库更新失败") async def equipment_replace_service(cid, sid, new_sid): """设备管理-修改-装置更换""" # 1. 确认cid和sid对应关系 monitor_dic = await monitor_by_cid_sid(cid, sid) if not monitor_dic: return success_res(code=RET.op_fail, msg="装置不存在,或者cid与sid不匹配") # 2. 调用接口,确定更换是否成功 flag, msg = await dev_replace(cid, sid, new_sid) if not flag: return success_res(code=RET.op_fail, msg=msg) return success_res(msg="操作成功") async def equipment_demolish_service(cid, sid): """设备管理-修改-装置拆除""" # 1. 确认cid和sid对应关系 monitor_dic = await monitor_by_cid_sid(cid, sid) if not monitor_dic: return success_res(code=RET.op_fail, msg="装置不存在,或者cid与sid不匹配") # 2. 调用接口,确定更换是否成功 flag, msg = await dev_demolish(cid, sid) if not flag: return success_res(code=RET.op_fail, msg=msg) # 3. 下发拆除装置命令 # 1.0装置不需要下发拆除,回复timeout认为成功 # 2.0因为有cid,mtid等字段,所以需要拆表,会返回正常报文 # dem_flag = await device_demolish_pds(sid) # if not dem_flag: # return Success(success=0, message="下发拆除装置命令失败") return success_res(msg="操作成功")