equipment_operations_service.py 9.16 KB
import asyncio
import json
import aiohttp
from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
from pot_libs.common.components.responses import success_res
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.settings import SETTING
from unify_api.modules.common.dao.common_dao import meter_param_by_mid, \
    point_by_pid_mtid, monitor_by_mtid
from unify_api.modules.electric.procedures.electric_util import get_wiring_type
from unify_api.modules.zhiwei_u import config
from unify_api.modules.zhiwei_u.dao.data_operations_dao import \
    select_point_by_mtid, location_by_mtid
from unify_api.modules.zhiwei_u.dao.warning_operations_dao import get_username
from unify_api.modules.zhiwei_u.dao.equipment_operations_dao import \
    equipment_operations_dao, total_equipment_operations_dao, \
    monitor_by_cid_sid
from unify_api.modules.zhiwei_u.components.equipment_operations_cps import \
    EquOptResp, EdResp, EdmResp
from unify_api.modules.zhiwei_u.procedures.dev_demolish import dev_demolish
from unify_api.modules.zhiwei_u.procedures.dev_replace import dev_replace
from unify_api.modules.zhiwei_u.procedures.equipment_operations_pds import \
    alarm_setting_pds, device_get_v1, device_get_config
from unify_api.utils.response_code import RET


async def equipment_operations_service(userid, prod_id, cid, sid, pid,
                                       page_num, page_size):
    """设备管理"""
    current_page = (page_num - 1) * page_size
    if sid and not pid:
        sql = f"SELECT c.product,c.cid,c.shortname,p.pid,p.name,m.sid,p.mtid" \
              f" FROM `monitor` m LEFT JOIN company c on m.cid=c.cid " \
              f"left join point p on p.mtid=m.mtid where demolished = 0 and " \
              f"sid='{sid}' " \
              f"limit {current_page}, {page_size}"
        t_sql = f"SELECT count(c.cid) total FROM `monitor` m " \
                f"LEFT JOIN company c " \
                f"on m.cid=c.cid left join point p on p.mtid=m.mtid " \
                f"where demolished = 0 and sid='{sid}'"
    else:
        conn_sql = "where demolished = 0 "
        if prod_id:
            conn_sql += f" and c.product={prod_id}"
        if cid:
            conn_sql += f" and c.cid={cid}"
        if pid:
            conn_sql += f" and p.pid={pid}"
        sql = f"SELECT c.product,c.cid,c.shortname,p.pid,p.name,m.sid,p.mtid" \
              f" FROM `monitor` m LEFT JOIN company c on m.cid=c.cid " \
              f"left join point p on p.mtid=m.mtid {conn_sql} " \
              f"limit {current_page}, {page_size}"
        t_sql = f"SELECT count(c.cid) total FROM `monitor` m " \
                f"LEFT JOIN company c " \
                f"on m.cid=c.cid left join point p on p.mtid=m.mtid " \
                f"{conn_sql}"
    user = await get_username(userid)
    is_mod = 1 if user["role"] == 2 else 0
    datas = await equipment_operations_dao(sql)
    total = await total_equipment_operations_dao(t_sql)
    for data in datas:
        data["productname"] = config.PRODUCT[data["product"]]
    return EquOptResp(total=total["total"], is_mod=is_mod, rows=datas)


async def equipment_details_mqtt_service(sid, mtid):
    """设备管理-详情和修改"""
    software_version, scope_status = "", ""
    upload_freq, harm_count = "", ""
    # 根据mtid查询meter_no
    monitor_dic = await monitor_by_mtid(mtid)
    meter_no = monitor_dic["meter_no"]
    # 1. 求设备当前状态22
    status_ = "离线"
    resp_str, status = await AioHttpUtils().get(
        url=f"http://{SETTING.device_status_host}:{SETTING.device_status_port}"
            f"/api/v4/clients/eems%2Ftd%2F{sid}",
        auth=aiohttp.BasicAuth(login=f"{SETTING.device_status_app_id}",
                               password=f"{SETTING.device_status_app_secret}")
    )
    res_http = json.loads(resp_str)
    if status == 200 and res_http["data"] and res_http["data"][0]["connected"]:
        status_ = "在线"
    # 4. 上传频率,谐波次数,录波当前状态,代码版本
    # 4.1先根据1.0协议查询数据
    if status_ == "在线":
        res_v1 = await device_get_v1(sid)
        log.info(f"res_v1:{res_v1}")
        if res_v1 and res_v1["status_code"] == 200:
            res_data = res_v1.get("data")
            software_version = res_data.get("software").get("software_version")
            scope_status = "开启" if res_data.get("scope").get("electric").get(
                meter_no).get("threshold").get("scopeEnable") == 1 else "关闭"
            # 上传频率, 条/分钟
            upload_freq = 1
            # 谐波次数
            harm_count = 17
        else:
            # 4.2根据2.0协议查询数据
            res_elec = await device_get_config(sid, key="electric")
            log.info(f"res_elec:{res_elec}")
            if res_elec and res_elec["status_code"] == 200:
                elec_data = res_elec.get("data")
                upload_freq = elec_data.get("electric").get(meter_no).get(
                    "upload_freq")
                harm_count = elec_data.get("electric").get(meter_no).get(
                    "harm_count")
            await asyncio.sleep(2)  # 装置单通道, 连着发会失败
            res_scope = await device_get_config(sid, key="scope")
            log.info(f"res_scope:{res_scope}")
            if res_scope and res_scope["status_code"] == 200:
                scope_data = res_scope.get("data")
                scope_status = "开启" if scope_data.get("scope").get(
                    sid).get("threshold").get(
                    "scopeEnable") == 1 else "关闭"
                software_version = res_scope.get("software_version")
    return EdResp(
        software_version=software_version,
        device_status=status_,
        scope_status=scope_status,
        upload_freq=upload_freq,
        harm_count=harm_count
    )


async def equipment_details_mysql_service(sid, mtid):
    """设备管理-详情和修改-从mysql获取数据"""
    # 1. 报警
    p_dic = await select_point_by_mtid(mtid)
    point_id = p_dic["pid"]
    location_info_list = await location_by_mtid(mtid)
    locations = [i["lid"] for i in location_info_list]
    # 从alarm_setting表获取数据
    alarm_list = await alarm_setting_pds(point_id, locations)
    # 2. 额定电压, 额定电流
    ct, pt, rated_voltage, rated_current = "", "", "", ""
    ctnum, mtid = await get_wiring_type(point_id)
    # 装置已安装逻辑:有mid且ptr、ctr有值
    if mtid:
        meter_param_dic = await meter_param_by_mid(mtid)
        if meter_param_dic:
            ct = meter_param_dic.get("ctr")
            ct = ct if ct is not None else ""
            pt = meter_param_dic.get("ptr")
            pt = pt if pt is not None else ""
            rated_voltage = meter_param_dic.get("vc")
            rated_voltage = rated_voltage if rated_voltage is not None else ""
            rated_current = meter_param_dic.get("imax")
            rated_current = rated_current if rated_current is not None else ""
    return EdmResp(
        ct=ct,
        pt=pt,
        rated_voltage=rated_voltage,
        rated_current=rated_current,
        alarm=alarm_list
    )


async def equipment_change_name_service(mtid, pid, new_name):
    """设备管理-修改-安装点更名"""
    # 1. 确认mtid和pid对应关系
    point_dic = await point_by_pid_mtid(mtid, pid)
    if not point_dic:
        return success_res(code=RET.op_fail, msg="安装点不存在,或者mtid与pid不匹配")
    # 2. 更改安装点名称
    sql_point = "UPDATE point SET name=%s WHERE pid=%s and mtid=%s"
    sql_monitor = "UPDATE monitor SET name=%s WHERE mtid=%s"

    async with MysqlUtil() as conn:
        res_p = await conn.execute(sql_point, args=(new_name, pid, mtid))
        res_m = await conn.execute(sql_monitor, args=(new_name, mtid))
    if res_p == 1 and res_m == 1:
        return success_res(msg="操作成功")
    else:
        return success_res(code=RET.op_fail, msg="数据库更新失败")


async def equipment_replace_service(cid, sid, new_sid):
    """设备管理-修改-装置更换"""
    # 1. 确认cid和sid对应关系
    monitor_dic = await monitor_by_cid_sid(cid, sid)
    if not monitor_dic:
        return success_res(code=RET.op_fail, msg="装置不存在,或者cid与sid不匹配")
    # 2. 调用接口,确定更换是否成功
    flag, msg = await dev_replace(cid, sid, new_sid)
    if not flag:
        return success_res(code=RET.op_fail, msg=msg)
    return success_res(msg="操作成功")


async def equipment_demolish_service(cid, sid):
    """设备管理-修改-装置拆除"""
    # 1. 确认cid和sid对应关系
    monitor_dic = await monitor_by_cid_sid(cid, sid)
    if not monitor_dic:
        return success_res(code=RET.op_fail, msg="装置不存在,或者cid与sid不匹配")
    # 2. 调用接口,确定更换是否成功
    flag, msg = await dev_demolish(cid, sid)
    if not flag:
        return success_res(code=RET.op_fail, msg=msg)
    # 3. 下发拆除装置命令
    # 1.0装置不需要下发拆除,回复timeout认为成功
    # 2.0因为有cid,mtid等字段,所以需要拆表,会返回正常报文
    # dem_flag = await device_demolish_pds(sid)
    # if not dem_flag:
    #     return Success(success=0, message="下发拆除装置命令失败")
    return success_res(msg="操作成功")