scope_operations_service.py 10.4 KB
Newer Older
1
import datetime
lcn's avatar
lcn committed
2 3 4
import json
import io
import pandas as pd
5 6

from pot_libs.qingstor_util.qs_client import QsClient
lcn's avatar
lcn committed
7 8
from unify_api.modules.zhiwei_u.procedures.scope_operations_pds import \
    get_event_info_by_event_id, get_scope_info_pds
lcn's avatar
lcn committed
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFilemin
from unify_api.modules.zhiwei_u import config
from unify_api.utils import time_format
from pot_libs.utils.exc_util import DBException
from dataclasses import fields
from unify_api.modules.zhiwei_u.dao.data_es_dao import get_scope_pids, \
    get_search_scope

from unify_api.modules.zhiwei_u.dao.order_operations_dao import \
    select_cname_by_cid
from unify_api.modules.zhiwei_u.dao.scope_operations_dao import \
    sid_to_order_dao, select_sid_by_pid, select_cids_by_prod_id, \
    select_cname_by_cids, select_sid_by_pids
from unify_api.modules.zhiwei_u.components.scope_operations_cps import \
    SearchScopeResq, Alarm
from unify_api.modules.zhiwei_u.dao.warning_operations_dao import get_username
from pot_libs.common.components.responses import Success
from unify_api.modules.electric.procedures.electric_util import get_wiring_type
from pot_libs.logger import log
from unify_api.modules.zhiwei_u.components.scope_operations_cps import \
    ScopeDetailsResponse, ScopeContent
from unify_api.modules.zhiwei_u.fault_foreast.test import leakage_reg
from unify_api.modules.scope_analyse.service.scope_analyse_service import \
    get_threhold


async def search_scope_service(prod_id, cid, pid, sid, page_num, page_size,
                               start, end):
    start_num = (page_num - 1) * page_size
    if not pid and sid:
        pids_datas = await sid_to_order_dao(sid)
        pids = [data["pid"] for data in pids_datas]
        # todo databases should be point_1min_scope, not point_1min_event
lcn's avatar
lcn committed
42 43
        datas, total = await get_scope_pids(pids, start, end, page_size,
                                            start_num)
lcn's avatar
lcn committed
44 45 46 47 48 49
    else:
        if prod_id and not cid and not pid:
            cid_list = await select_cids_by_prod_id(prod_id)
            cid = [c["cid"] for c in cid_list]
        elif cid:
            cid = [cid]
lcn's avatar
lcn committed
50 51 52
        datas, total = await get_search_scope(cid, pid, start, end, page_size,
                                              start_num)
    
lcn's avatar
lcn committed
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    if not datas:
        return SearchScopeResq(total=0, rows=[])
    pids, cids = [], []
    for info in datas:
        pids.append(info["pid"])
        cids.append(info["cid"])
    companys = await select_cname_by_cids(list(set(cids)))
    points = await select_sid_by_pids(list(set(pids)))
    rows = []
    for info in datas:
        cid = info["cid"]
        for index, c in enumerate(companys):
            if cid == c["cid"]:
                company = companys[index]
                break
        else:
            company = []
        pid = info["pid"]
        message = info["message"]
        check_dt = info["event_datetime"]
        dt = time_format.convert_to_dt(check_dt)
        check_dt = time_format.convert_dt_to_timestr(dt)
        for index, p in enumerate(points):
            if pid == p["pid"]:
                point = points[index]
                break
        else:
            point = []
lcn's avatar
lcn committed
81
        doc_id = str(info["id"])
lcn's avatar
lcn committed
82 83 84
        alarm = Alarm(check_dt=check_dt, prod_id=company.get("product"),
                      product=config.PRODUCT[company.get("product")],
                      cid=cid, shortname=company.get("shortname"),
lcn's avatar
lcn committed
85
                      pid=pid, point=point.get("name"), sid=point.get("sid"),
lcn's avatar
lcn committed
86 87 88 89 90 91
                      message=message, doc_id=doc_id
                      )
        rows.append(alarm)
    return SearchScopeResq(total=total, rows=rows)


92 93 94 95 96 97 98
async def load_context(url):
    async with QsClient() as client:
        context_bytes = await client.get_file(key=url)
    context = json.loads(context_bytes.decode("utf-8"))
    return context


lcn's avatar
lcn committed
99 100 101
async def scope_detail_service(id, wave_range, download=None, user_id=None):
    # 1.根据es_id查询point_1min_event对应的point属性
    try:
102 103 104
        mysql_results = await get_event_info_by_event_id(id)
    except Exception as e:
        log.error(f"query error {e}")
lcn's avatar
lcn committed
105
        return ScopeDetailsResponse().db_error()
lcn's avatar
lcn committed
106
    
lcn's avatar
lcn committed
107
    try:
108
        source = mysql_results
lcn's avatar
lcn committed
109 110
        point_name = source.get("name")
        point_id = source.get("point_id")
111
        mtid = source.get("mtid")
lcn's avatar
lcn committed
112
        cmp_time = source.get("time")
lcn's avatar
lcn committed
113
        
lcn's avatar
lcn committed
114 115 116 117 118 119 120
        cid = source.get("cid")
        company = await select_cname_by_cid(cid)
        message = source.get("message")
        check_dt = source.get("datetime")
        dt = time_format.convert_to_dt(check_dt)
        check_dt = time_format.convert_dt_to_timestr(dt)
        point = await select_sid_by_pid(point_id)
lcn's avatar
lcn committed
121
    
lcn's avatar
lcn committed
122
    except Exception:
123
        log.warning(f"query err {id}")
lcn's avatar
lcn committed
124 125 126 127 128
        raise DBException
    # 4.接线法:二表/三表
    ctnum, _ = await get_wiring_type(point_id)
    if not ctnum:
        ctnum = 3
lcn's avatar
lcn committed
129
    
lcn's avatar
lcn committed
130
    try:
131
        mysql_results = await get_scope_info_pds(mtid, check_dt)
lcn's avatar
lcn committed
132 133 134
    except Exception as e:
        log.error(f"es query error {e}")
        return ScopeDetailsResponse().db_error()
lcn's avatar
lcn committed
135
    
136
    if not mysql_results:
lcn's avatar
lcn committed
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
        alarm = Alarm(check_dt=check_dt, pid=point_id, cid=cid,
                      point=point_name, sid=point["sid"], message=message,
                      prod_id=company["product"],
                      product=config.PRODUCT[company["product"]],
                      shortname=company["shortname"], contin_time=0,
                      result="暂无数据")
        return ScopeDetailsResponse(
            alarm=alarm,
            ctnum=ctnum,
            group=point_name,
            item={},
            scope_type="",
            type="",
            date_time="",
            contents={}
        )
153 154
    source = mysql_results
    scope_type = source.get("fault_type")
lcn's avatar
lcn committed
155
    fault_type = config.EVENT_TYPE_MAP.get(scope_type)
lcn's avatar
lcn committed
156 157
    date_time = datetime.datetime.strftime(source.get("datetime"),
                                           "%Y-%m-%d %H:%M:%S")
158
    position = list(json.loads(source.get("result")).keys())
lcn's avatar
lcn committed
159
    position_tmp = [
lcn's avatar
lcn committed
160 161
        pos + "相" if len(pos) == 1 else pos + "线" if len(pos) == 2 else ''
        for
lcn's avatar
lcn committed
162 163 164
        pos in position]
    position_tmp = [p for p in position_tmp if p]  # 去掉空字符串
    position_str = "|".join(position_tmp)
lcn's avatar
lcn committed
165
    
lcn's avatar
lcn committed
166
    # 3.曲线数据
167
    res_dic = json.loads(source.get("result"))
lcn's avatar
lcn committed
168
    location = [d for d in res_dic.values()][0].get("location")
169
    wave_data = await load_context(source.get("context_url"))
lcn's avatar
lcn committed
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
    length = wave_data.get("ia") or wave_data.get("ileak_rms")
    contin_time = len(length)
    if wave_range == "100ms":
        if location <= 200:
            for key, value in wave_data.items():
                wave_data[key] = value[:400]
        elif location >= 1400:
            for key, value in wave_data.items():
                wave_data[key] = value[1200:]
        else:
            for key, value in wave_data.items():
                wave_data[key] = value[location - 200:location + 200]
    scope_content = ScopeContent(
        **{k: v for k, v in wave_data.items() if
           k in [field.name for field in fields(ScopeContent)]}
    )
    # 下载报表
    if download:
        download_auth = None
        if user_id:
            user_info = await get_username(user_id)
            download_auth = user_info.get("role", None)
        if scope_type == "over_res_cur":
            wave_data = {
                "序号": [i for i in range(1, contin_time + 1)],
                "ileak_rms": wave_data.get("ileak_rms"),
                "ileak": wave_data.get("ileak"),
            }
            table_name = f'{company["shortname"]}_{point_name}_' \
                         f'{check_dt[:10]}_漏电流录波'
            return await excl_download(wave_data, table_name, "sheet1")
        elif download_auth == 2:
            wave_data = {
                "序号": [i for i in range(1, contin_time + 1)],
                "ia": wave_data.get("ia"),
                "ib": wave_data.get("ib"),
                "ic": wave_data.get("ic"),
                "ua": wave_data.get("ua"),
                "ub": wave_data.get("ub"),
                "uc": wave_data.get("uc"),
            }
            table_name = f'{company["shortname"]}_{point_name}_' \
                         f'{check_dt[:10]}_电压录波/电流录波'
            return await excl_download(wave_data, table_name, "sheet1")
        else:
            return Success(success=0, message="您的权限不足")
    # 结论分析
    try:
        log.warning(f"wave_data:{wave_data}")
        if scope_type == "over_res_cur":
            threhold = await get_threhold(cid, point["sid"])
            result = leakage_reg(ileak_rms=wave_data["ileak_rms"],
                                 leak_hold=threhold)
            log.info(f"actionFile 漏电流 sid:{point['sid']}结论:{result}")
        else:
            res = actionFilemin(wave_data, ctnum)
            log.warning(f"结论分析 res:{res}")
            if res == "nofault":
                result = "不存在故障"
            else:
                result = ""
                for r in res:
lcn's avatar
lcn committed
232
                    result += "%s : %0.0f%%;" % (r[0], r[1] * 100)
lcn's avatar
lcn committed
233 234 235 236 237 238 239 240 241 242
    except Exception as e:
        result = ""
        log.error(f"actionFile error:{e}")
    alarm = Alarm(check_dt=check_dt, pid=point_id, cid=cid,
                  point=point_name, sid=point["sid"], message=message,
                  prod_id=company["product"],
                  product=config.PRODUCT[company["product"]],
                  shortname=company["shortname"],
                  contin_time=int(contin_time * 0.25),
                  result=result)
lcn's avatar
lcn committed
243
    
lcn's avatar
lcn committed
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
    return ScopeDetailsResponse(
        alarm=alarm,
        ctnum=ctnum,
        group=point_name,
        item=position_str,
        scope_type=scope_type,
        type=fault_type,
        date_time=date_time,
        location=location,
        contents=scope_content
    )


async def excl_download(dict_data, table_name, sheet_name):
    output = io.BytesIO()
    excel_name = table_name + '.xlsx'
    writer = pd.ExcelWriter(output, engine='xlsxwriter')
    df = pd.DataFrame(dict_data)
    df.to_excel(writer, sheet_name=sheet_name, index=False)
    writer.save()
    output.seek(0)
    return output, excel_name


async def dataframe_excl_download(data, table_name, sheet_name="sheet1"):
    output = io.BytesIO()
    excel_name = table_name + '.xlsx'
    writer = pd.ExcelWriter(output, engine='xlsxwriter')
    data.to_excel(writer, sheet_name=sheet_name, index=False)
    writer.save()
    output.seek(0)
    return output, excel_name