scope_operations_service.py 10.7 KB
Newer Older
1
import datetime
lcn's avatar
lcn committed
2 3 4
import json
import io
import pandas as pd
5 6

from pot_libs.qingstor_util.qs_client import QsClient
wang.wenrong's avatar
wang.wenrong committed
7
from unify_api.modules.zhiwei_u.procedures.scope_operations_pds import get_event_info_by_event_id, get_scope_info_pds
lcn's avatar
lcn committed
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFilemin
from unify_api.modules.zhiwei_u import config
from unify_api.utils import time_format
from pot_libs.utils.exc_util import DBException
from dataclasses import fields
from unify_api.modules.zhiwei_u.dao.data_es_dao import get_scope_pids, \
    get_search_scope

from unify_api.modules.zhiwei_u.dao.order_operations_dao import \
    select_cname_by_cid
from unify_api.modules.zhiwei_u.dao.scope_operations_dao import \
    sid_to_order_dao, select_sid_by_pid, select_cids_by_prod_id, \
    select_cname_by_cids, select_sid_by_pids
from unify_api.modules.zhiwei_u.components.scope_operations_cps import \
    SearchScopeResq, Alarm
from pot_libs.common.components.query import (
    PageRequest,
    Equal,
    Filter
)
from unify_api.modules.zhiwei_u.dao.warning_operations_dao import get_username
from pot_libs.common.components.responses import Success
from unify_api.modules.electric.procedures.electric_util import get_wiring_type
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.es_util.es_query import EsQuery
from pot_libs.logger import log
from unify_api.modules.zhiwei_u.components.scope_operations_cps import \
    ScopeDetailsResponse, ScopeContent
from unify_api.modules.zhiwei_u.fault_foreast.test import leakage_reg
from unify_api.modules.scope_analyse.service.scope_analyse_service import \
    get_threhold


async def search_scope_service(prod_id, cid, pid, sid, page_num, page_size,
                               start, end):
    start_num = (page_num - 1) * page_size
    if not pid and sid:
        pids_datas = await sid_to_order_dao(sid)
        pids = [data["pid"] for data in pids_datas]
        # todo databases should be point_1min_scope, not point_1min_event
        scope_datas = await get_scope_pids(pids, start, end)

        datas = scope_datas[start_num: start_num+page_size]
        total = len(scope_datas)
        # datas = await query_search_scope_pids(pids, page_num, page_size,
        #                                       start, end)
    else:
        if prod_id and not cid and not pid:
            cid_list = await select_cids_by_prod_id(prod_id)
            cid = [c["cid"] for c in cid_list]
        elif cid:
            cid = [cid]
        scope_datas = await get_search_scope(cid, pid, start, end)
        datas = scope_datas[start_num: start_num + page_size]
        total = len(scope_datas)

    if not datas:
        return SearchScopeResq(total=0, rows=[])
    pids, cids = [], []
    for info in datas:
        pids.append(info["pid"])
        cids.append(info["cid"])
    companys = await select_cname_by_cids(list(set(cids)))
    points = await select_sid_by_pids(list(set(pids)))
    rows = []
    for info in datas:
        cid = info["cid"]
        for index, c in enumerate(companys):
            if cid == c["cid"]:
                company = companys[index]
                break
        else:
            company = []
        pid = info["pid"]
        message = info["message"]
        check_dt = info["event_datetime"]
        dt = time_format.convert_to_dt(check_dt)
        check_dt = time_format.convert_dt_to_timestr(dt)
        for index, p in enumerate(points):
            if pid == p["pid"]:
                point = points[index]
                break
        else:
            point = []
lcn's avatar
lcn committed
92
        doc_id = str(info["id"])
lcn's avatar
lcn committed
93 94 95 96 97 98 99 100 101 102
        alarm = Alarm(check_dt=check_dt, prod_id=company.get("product"),
                      product=config.PRODUCT[company.get("product")],
                      cid=cid, shortname=company.get("shortname"),
                      pid=pid,  point=point.get("name"),  sid=point.get("sid"),
                      message=message, doc_id=doc_id
                      )
        rows.append(alarm)
    return SearchScopeResq(total=total, rows=rows)


103 104 105 106 107 108 109
async def load_context(url):
    async with QsClient() as client:
        context_bytes = await client.get_file(key=url)
    context = json.loads(context_bytes.decode("utf-8"))
    return context


lcn's avatar
lcn committed
110 111 112 113
async def scope_detail_service(id, wave_range, download=None, user_id=None):

    # 1.根据es_id查询point_1min_event对应的point属性
    try:
114 115 116
        mysql_results = await get_event_info_by_event_id(id)
    except Exception as e:
        log.error(f"query error {e}")
lcn's avatar
lcn committed
117 118 119
        return ScopeDetailsResponse().db_error()

    try:
120
        source = mysql_results
lcn's avatar
lcn committed
121 122
        point_name = source.get("name")
        point_id = source.get("point_id")
123
        mtid = source.get("mtid")
lcn's avatar
lcn committed
124 125 126 127 128 129 130 131 132 133 134
        cmp_time = source.get("time")

        cid = source.get("cid")
        company = await select_cname_by_cid(cid)
        message = source.get("message")
        check_dt = source.get("datetime")
        dt = time_format.convert_to_dt(check_dt)
        check_dt = time_format.convert_dt_to_timestr(dt)
        point = await select_sid_by_pid(point_id)

    except Exception:
135
        log.warning(f"query err {id}")
lcn's avatar
lcn committed
136 137 138 139 140 141 142
        raise DBException
    # 4.接线法:二表/三表
    ctnum, _ = await get_wiring_type(point_id)
    if not ctnum:
        ctnum = 3

    try:
143
        mysql_results = await get_scope_info_pds(mtid, check_dt)
lcn's avatar
lcn committed
144 145 146 147
    except Exception as e:
        log.error(f"es query error {e}")
        return ScopeDetailsResponse().db_error()

148
    if not mysql_results:
lcn's avatar
lcn committed
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165

        alarm = Alarm(check_dt=check_dt, pid=point_id, cid=cid,
                      point=point_name, sid=point["sid"], message=message,
                      prod_id=company["product"],
                      product=config.PRODUCT[company["product"]],
                      shortname=company["shortname"], contin_time=0,
                      result="暂无数据")
        return ScopeDetailsResponse(
            alarm=alarm,
            ctnum=ctnum,
            group=point_name,
            item={},
            scope_type="",
            type="",
            date_time="",
            contents={}
        )
166 167
    source = mysql_results
    scope_type = source.get("fault_type")
lcn's avatar
lcn committed
168
    fault_type = config.EVENT_TYPE_MAP.get(scope_type)
169 170
    date_time = datetime.datetime.strftime(source.get("datetime"), "%Y-%m-%d %H:%M:%S")
    position = list(json.loads(source.get("result")).keys())
lcn's avatar
lcn committed
171 172 173 174 175 176 177
    position_tmp = [
        pos + "相" if len(pos) == 1 else pos + "线" if len(pos) == 2 else '' for
        pos in position]
    position_tmp = [p for p in position_tmp if p]  # 去掉空字符串
    position_str = "|".join(position_tmp)

    # 3.曲线数据
178
    res_dic = json.loads(source.get("result"))
lcn's avatar
lcn committed
179
    location = [d for d in res_dic.values()][0].get("location")
180
    wave_data = await load_context(source.get("context_url"))
lcn's avatar
lcn committed
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
    length = wave_data.get("ia") or wave_data.get("ileak_rms")
    contin_time = len(length)
    if wave_range == "100ms":
        if location <= 200:
            for key, value in wave_data.items():
                wave_data[key] = value[:400]
        elif location >= 1400:
            for key, value in wave_data.items():
                wave_data[key] = value[1200:]
        else:
            for key, value in wave_data.items():
                wave_data[key] = value[location - 200:location + 200]
    scope_content = ScopeContent(
        **{k: v for k, v in wave_data.items() if
           k in [field.name for field in fields(ScopeContent)]}
    )
    # 下载报表
    if download:
        download_auth = None
        if user_id:
            user_info = await get_username(user_id)
            download_auth = user_info.get("role", None)
        if scope_type == "over_res_cur":
            wave_data = {
                "序号": [i for i in range(1, contin_time + 1)],
                "ileak_rms": wave_data.get("ileak_rms"),
                "ileak": wave_data.get("ileak"),
            }
            table_name = f'{company["shortname"]}_{point_name}_' \
                         f'{check_dt[:10]}_漏电流录波'
            return await excl_download(wave_data, table_name, "sheet1")
        elif download_auth == 2:
            wave_data = {
                "序号": [i for i in range(1, contin_time + 1)],
                "ia": wave_data.get("ia"),
                "ib": wave_data.get("ib"),
                "ic": wave_data.get("ic"),
                "ua": wave_data.get("ua"),
                "ub": wave_data.get("ub"),
                "uc": wave_data.get("uc"),
            }
            table_name = f'{company["shortname"]}_{point_name}_' \
                         f'{check_dt[:10]}_电压录波/电流录波'
            return await excl_download(wave_data, table_name, "sheet1")
        else:
            return Success(success=0, message="您的权限不足")
    # 结论分析
    try:
        log.warning(f"wave_data:{wave_data}")
        if scope_type == "over_res_cur":
            threhold = await get_threhold(cid, point["sid"])
            result = leakage_reg(ileak_rms=wave_data["ileak_rms"],
                                 leak_hold=threhold)
            log.info(f"actionFile 漏电流 sid:{point['sid']}结论:{result}")
        else:
            res = actionFilemin(wave_data, ctnum)
            log.warning(f"结论分析 res:{res}")
            if res == "nofault":
                result = "不存在故障"
            else:
                result = ""
                for r in res:
                    result += "%s : %0.0f%%;" % (r[0], r[1]*100)
    except Exception as e:
        result = ""
        log.error(f"actionFile error:{e}")
    alarm = Alarm(check_dt=check_dt, pid=point_id, cid=cid,
                  point=point_name, sid=point["sid"], message=message,
                  prod_id=company["product"],
                  product=config.PRODUCT[company["product"]],
                  shortname=company["shortname"],
                  contin_time=int(contin_time * 0.25),
                  result=result)

    return ScopeDetailsResponse(
        alarm=alarm,
        ctnum=ctnum,
        group=point_name,
        item=position_str,
        scope_type=scope_type,
        type=fault_type,
        date_time=date_time,
        location=location,
        contents=scope_content
    )


async def excl_download(dict_data, table_name, sheet_name):
    output = io.BytesIO()
    excel_name = table_name + '.xlsx'
    writer = pd.ExcelWriter(output, engine='xlsxwriter')
    df = pd.DataFrame(dict_data)
    df.to_excel(writer, sheet_name=sheet_name, index=False)
    writer.save()
    output.seek(0)
    return output, excel_name


async def dataframe_excl_download(data, table_name, sheet_name="sheet1"):
    output = io.BytesIO()
    excel_name = table_name + '.xlsx'
    writer = pd.ExcelWriter(output, engine='xlsxwriter')
    data.to_excel(writer, sheet_name=sheet_name, index=False)
    writer.save()
    output.seek(0)
    return output, excel_name