electric_service.py 26.5 KB
Newer Older
1
import json
2 3
import math

lcn's avatar
lcn committed
4
import pandas as pd
5
import pendulum
lcn's avatar
lcn committed
6
from pot_libs.settings import SETTING
7
from pot_libs.logger import log
lcn's avatar
lcn committed
8
from pot_libs.utils.exc_util import BusinessException
9
from pot_libs.aredis_util.aredis_utils import RedisUtils
ZZH's avatar
ZZH committed
10 11 12 13
from unify_api.constants import (
    POINT_LEVEL_MAP, U_THRESHOLD, COSTTL_THRESHOLD, LF_THRESHOLD,
    THDU_THRESHOLD, BL_THRESHOLD, THDI_THRESHOLD
)
lcn's avatar
lcn committed
14
from unify_api.modules.common.procedures.points import points_by_storeys, \
ZZH's avatar
ZZH committed
15
    get_meter_by_point
lcn's avatar
lcn committed
16 17 18 19
from unify_api.modules.electric.dao.electric_dao import \
    monitor_point_join_by_points, get_electric_datas_dao
from unify_api.utils.common_utils import round_2, round_4, multiplication_two
from unify_api.modules.electric.procedures.electric_util import \
20
    load_point_ctnum
lcn's avatar
lcn committed
21
from datetime import datetime
22
from unify_api.constants import REAL_EXP_TIME
lcn's avatar
lcn committed
23
from unify_api.utils.time_format import CST, YMD_Hms, timestamp2dts
lcn's avatar
lcn committed
24
from unify_api.modules.common.procedures.location_temp_rcurrent import \
ZZH's avatar
ZZH committed
25
    location_stats_statics
lcn's avatar
lcn committed
26 27 28 29 30
from unify_api.modules.electric.components.electric import (
    ElecIndexResponse, ElecIndex, EscResp, QcsResp, EclResp, QclResp,
)


31
async def elec_current_storeys_service(cid, storeys):
lcn's avatar
lcn committed
32 33
    """用电监测-实时监测-楼层"""
    # 1.根据storeys获取points信息
34
    point_list = await points_by_storeys(cid, storeys)
lcn's avatar
lcn committed
35
    mtids = [i.get("mtid") for i in point_list]
36
    d_rt_ele = await batch_load_rt_ele(mtids)
lcn's avatar
lcn committed
37 38 39 40 41 42 43 44
    elec_data = {}
    for info in point_list:
        storey_name = info.get("storey_name")
        storey_id = info.get("storey_id")
        point_id = info.get("point_id")
        room_name = info.get("room_name")
        mtid = info.get("mtid")
        ctnum = info.get("ctnum")
45 46
        rt_ele = d_rt_ele.get(mtid, None)
        if rt_ele:
lcn's avatar
lcn committed
47
            # 识电U只有一项有数据,返回具体的项
48 49
            rt_ele = get_sdu_i_and_u(rt_ele, ctnum)
            time_str = timestamp2dts(rt_ele["ts"], YMD_Hms)
lcn's avatar
lcn committed
50 51 52 53 54 55
            res_dic = {
                "room_name": room_name,
                "storey_id": storey_id,
                "point_id": point_id,
                "ctnum": ctnum,
                "real_time": time_str,
56 57 58 59 60 61 62 63 64 65 66 67 68
                "ua": round_2(rt_ele.get("ua")),
                "ia": round_2(rt_ele.get("ia")),
                "ub": round_2(rt_ele.get("ub")),
                "ib": round_2(rt_ele.get("ib")),
                "uc": round_2(rt_ele.get("uc")),
                "ic": round_2(rt_ele.get("ic")),
                "pttl": round_2(rt_ele.get("pttl")),
                "qttl": round_2(rt_ele.get("qttl")),
                "freq": round_2(rt_ele.get("freq")),
                "costtl": round_2(rt_ele.get("costtl")),
                "lf": round_2(rt_ele.get("lf")),
                "sdu_i": rt_ele.get("sdu_i"),
                "sdu_u": rt_ele.get("sdu_u"), }
lcn's avatar
lcn committed
69 70 71 72 73 74 75
        else:
            res_dic = {
                "room_name": room_name,
                "point_id": point_id,
                "storey_id": storey_id,
                "ctnum": ctnum,
                "real_time": "",
76 77 78
                "ua": "", "ia": "", "ub": "", "ib": "", "uc": "", "ic": "",
                "pttl": "", "qttl": "", "freq": "", "costtl": "", "lf": "",
                "sdu_i": "", "sdu_u": "", }
lcn's avatar
lcn committed
79

lcn's avatar
lcn committed
80 81 82 83
        if storey_name in elec_data:
            elec_data[storey_name].append(res_dic)
        else:
            elec_data[storey_name] = [res_dic]
lcn's avatar
lcn committed
84

lcn's avatar
lcn committed
85 86 87 88 89 90 91 92 93 94 95 96 97
    # 转换成list格式, 可以按照storey_name排序
    if elec_data:
        # 房间排序, 并返回数据转化为list
        elec_list = [{"name": key, "storey_id": value[0]["storey_id"],
                      "room_data": sorted(value, key=lambda i: i["room_name"])}
                     for key, value in elec_data.items()]
        # 楼层排序
        elec_list = sorted(elec_list, key=lambda x: x["storey_id"])
    else:
        elec_list = []
    return EscResp(elec_data=elec_list)


98
async def qual_current_storeys_service(cid, storeys):
lcn's avatar
lcn committed
99 100
    """电能质量-实时参数-楼层"""
    # 1.根据storeys获取points信息
101
    point_list = await points_by_storeys(cid, storeys)
lcn's avatar
lcn committed
102
    mtids = [point["mtid"] for point in point_list if point["mtid"]]
103
    d_rt_ele = await batch_load_rt_ele_with_hr(mtids)
lcn's avatar
lcn committed
104 105 106 107 108 109 110 111 112
    # 4. 返回数据
    qual_data = {}
    for info in point_list:
        storey_name = info.get("storey_name")
        storey_id = info.get("storey_id")
        point_id = info.get("point_id")
        room_name = info.get("room_name")
        mtid = info.get("mtid")
        ctnum = info.get("ctnum") if info.get("ctnum") == 2 else 3
113 114 115 116
        rt_ele = d_rt_ele.get(mtid, None)
        if rt_ele:
            rt_ele = get_sdu_i_and_u(rt_ele, ctnum)
            time_str = timestamp2dts(rt_ele["ts"], YMD_Hms)
lcn's avatar
lcn committed
117 118 119 120 121 122 123
            res_dic = {
                "room_name": room_name,
                "storey_id": storey_id,
                "point_id": point_id,
                "ctnum": ctnum,
                "real_time": time_str,
                # 电流/电压谐波畸变率
124 125 126 127 128 129
                "thdia": round_4(rt_ele.get("thdia")),
                "thdib": round_4(rt_ele.get("thdib")),
                "thdic": round_4(rt_ele.get("thdic")),
                "thdua": round_4(rt_ele.get("thdua")),
                "thdub": round_4(rt_ele.get("thdub")),
                "thduc": round_4(rt_ele.get("thduc")),
lcn's avatar
lcn committed
130
                # 电压偏差
131 132 133 134 135
                "ua_dev": round_4(rt_ele.get("ua_dev")),
                "ub_dev": round_4(rt_ele.get("ub_dev")),
                "uc_dev": round_4(rt_ele.get("uc_dev")),
                "sdu_i": rt_ele.get("sdu_i"),
                "sdu_u": rt_ele.get("sdu_u"), }
lcn's avatar
lcn committed
136 137 138 139 140 141 142
        else:
            res_dic = {
                "room_name": room_name,
                "storey_id": storey_id,
                "point_id": point_id,
                "ctnum": ctnum,
                "real_time": "",
143 144 145 146
                "thdia": "", "thdib": "", "thdic": "",
                "thdua": "", "thdub": "", "thduc": "",
                "ua_dev": "", "ub_dev": "", "uc_dev": "",
                "sdu_i": "", "sdu_u": "", }
lcn's avatar
lcn committed
147 148 149 150
        if storey_name in qual_data:
            qual_data[storey_name].append(res_dic)
        else:
            qual_data[storey_name] = [res_dic]
lcn's avatar
lcn committed
151

lcn's avatar
lcn committed
152 153 154
    # 转换成list格式, 可以按照storey_name排序
    if qual_data:
        # 房间排序, 并返回数据转化为list
155 156 157
        lst_qual = [{"name": key, "storey_id": value[0]["storey_id"],
                     "room_data": sorted(value, key=lambda i: i["room_name"])}
                    for key, value in qual_data.items()]
lcn's avatar
lcn committed
158
        # 楼层排序
159
        lst_qual = sorted(lst_qual, key=lambda x: x["storey_id"])
lcn's avatar
lcn committed
160
    else:
161 162
        lst_qual = []
    return QcsResp(qual_data=lst_qual)
lcn's avatar
lcn committed
163 164 165 166 167


async def elec_card_level_service(point_list):
    """用电监测-卡片信息-level"""
    # 1. 获取每个point_id的详细信息
168 169 170
    d_point_info = await monitor_point_join_by_points(point_list)
    mtids = [p_info["mtid"] for p_info in d_point_info if p_info["mtid"]]
    d_rt_ele = await batch_load_rt_ele(mtids)
lcn's avatar
lcn committed
171
    # 4. 返回数据
172 173 174 175 176 177 178
    ret_data = {"inline": [],
                "transformer": [],
                "feeder": [],
                "power_dist": [],
                "device": []
                }
    for info in d_point_info:
lcn's avatar
lcn committed
179 180 181 182 183
        m_name = info.get("name")
        m_type = POINT_LEVEL_MAP[info.get("m_type")]
        point_id = info.get("pid")
        mtid = info.get("mtid")
        ctnum = info.get("ctnum") if info.get("ctnum") == 2 else 3
184 185 186 187
        rt_ele = d_rt_ele.get(mtid, None)
        if rt_ele:
            rt_ele = get_sdu_i_and_u(rt_ele, ctnum)
            time_str = timestamp2dts(rt_ele["ts"], YMD_Hms)
lcn's avatar
lcn committed
188 189 190 191 192
            res_dic = {
                "name": m_name,
                "point_id": point_id,
                "ctnum": ctnum,
                "real_time": time_str,
193 194 195 196 197 198 199 200 201 202 203 204 205
                "ua": round_2(rt_ele.get("ua")),
                "ia": round_2(rt_ele.get("ia")),
                "ub": round_2(rt_ele.get("ub")),
                "ib": round_2(rt_ele.get("ib")),
                "uc": round_2(rt_ele.get("uc")),
                "ic": round_2(rt_ele.get("ic")),
                "pttl": round_2(rt_ele.get("pttl")),
                "qttl": round_2(rt_ele.get("qttl")),
                "freq": round_2(rt_ele.get("freq")),
                "costtl": round_2(rt_ele.get("costtl")),
                "lf": round_2(rt_ele.get("lf")),
                "sdu_i": rt_ele.get("sdu_i"),
                "sdu_u": rt_ele.get("sdu_u"),
lcn's avatar
lcn committed
206
                # 增加电压偏差,用于判断是否超过阈值标红
207 208 209 210 211
                "ua_dev": round_4(rt_ele.get("ua_dev")),
                "ub_dev": round_4(rt_ele.get("ub_dev")),
                "uc_dev": round_4(rt_ele.get("uc_dev")),
                "uab_dev": round_4(rt_ele.get("uab_dev")),
                "ucb_dev": round_4(rt_ele.get("ucb_dev")),
lcn's avatar
lcn committed
212 213 214 215 216 217 218 219 220 221 222
                # 增加阈值
                "u_threshold": U_THRESHOLD,
                "costtl_threshold": COSTTL_THRESHOLD,
                "lf_threshold": LF_THRESHOLD,
            }
        else:
            res_dic = {
                "name": m_name,
                "point_id": point_id,
                "ctnum": ctnum,
                "real_time": "",
223 224 225 226 227 228 229 230
                "ua": "", "ia": "", "ub": "", "ib": "", "uc": "", "ic": "",
                "pttl": "", "qttl": "", "freq": "", "costtl": "", "lf": "",
                "sdu_i": "", "sdu_u": "",
                "ua_dev": "", "ub_dev": "", "uc_dev": "",
                "uab_dev": "", "ucb_dev": "",
                "u_threshold": U_THRESHOLD,
                "costtl_threshold": COSTTL_THRESHOLD,
                "lf_threshold": LF_THRESHOLD,
lcn's avatar
lcn committed
231
            }
lcn's avatar
lcn committed
232

lcn's avatar
lcn committed
233
        ret_data[m_type].append(res_dic)
234 235 236 237 238 239
    return EclResp(inline=ret_data["inline"],
                   transformer=ret_data["transformer"],
                   feeder=ret_data["feeder"],
                   power_dist=ret_data["power_dist"],
                   device=ret_data["device"],
                   )
lcn's avatar
lcn committed
240 241 242 243 244


async def qual_current_level_service(point_list):
    """电能质量-卡片信息-level"""
    # 1. 获取每个point_id的详细信息
245 246 247
    d_point_info = await monitor_point_join_by_points(point_list)
    mtids = [p_info["mtid"] for p_info in d_point_info if p_info["mtid"]]
    d_rt_ele = await batch_load_rt_ele_with_hr(mtids)
lcn's avatar
lcn committed
248 249 250 251 252 253 254
    ret_data = {
        "inline": [],
        "transformer": [],
        "feeder": [],
        "power_dist": [],
        "device": []
    }
255
    for info in d_point_info:
lcn's avatar
lcn committed
256 257 258 259 260
        m_name = info.get("name")
        m_type = POINT_LEVEL_MAP[info.get("m_type")]
        point_id = info.get("pid")
        mtid = info.get("mtid")
        ctnum = info.get("ctnum") if info.get("ctnum") == 2 else 3
261 262 263 264 265
        rt_ele = d_rt_ele.get(mtid, None)
        if rt_ele:
            # 初始化返回dic
            rt_ele = get_sdu_i_and_u(rt_ele, ctnum)
            time_str = timestamp2dts(rt_ele["ts"], YMD_Hms)
lcn's avatar
lcn committed
266

267 268 269
            fdia = round_2(rt_ele.get("fdia"))
            fdib = round_2(rt_ele.get("fdib"))
            fdic = round_2(rt_ele.get("fdic"))
lcn's avatar
lcn committed
270

271 272 273
            thdia = round_4(rt_ele.get("thdia"))
            thdib = round_4(rt_ele.get("thdib"))
            thdic = round_4(rt_ele.get("thdic"))
lcn's avatar
lcn committed
274

lcn's avatar
lcn committed
275 276 277 278 279 280
            res_dic = {
                "name": m_name,
                "point_id": point_id,
                "ctnum": ctnum,
                "real_time": time_str,
                # 电流/电压谐波畸变率
281
                "thdia": thdia, "thdib": thdib, "thdic": thdic,
lcn's avatar
lcn committed
282

283 284 285 286 287
                "thdua": round_4(rt_ele.get("thdua")),
                "thdub": round_4(rt_ele.get("thdub")),
                "thduc": round_4(rt_ele.get("thduc")),
                "thduab": round_4(rt_ele.get("thduab")),
                "thducb": round_4(rt_ele.get("thducb")),
lcn's avatar
lcn committed
288
                # 基波电流
289
                "fdia": fdia, "fdib": fdib, "fdic": fdic,
lcn's avatar
lcn committed
290
                # 三相不平衡
291 292
                "ubl": round_4(rt_ele.get("ubl")),
                "ibl": round_4(rt_ele.get("ibl")),
lcn's avatar
lcn committed
293
                # 电压偏差
294 295 296 297 298
                "ua_dev": round_4(rt_ele.get("ua_dev")),
                "ub_dev": round_4(rt_ele.get("ub_dev")),
                "uc_dev": round_4(rt_ele.get("uc_dev")),
                "uab_dev": round_4(rt_ele.get("uab_dev")),
                "ucb_dev": round_4(rt_ele.get("ucb_dev")),
lcn's avatar
lcn committed
299

lcn's avatar
lcn committed
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
                # 电流总谐波有效值 = 基波电流 * 电流总谐波畸变率
                "thdia_virtual": round_2(multiplication_two(fdia, thdia)),
                "thdib_virtual": round_2(multiplication_two(fdib, thdib)),
                "thdic_virtual": round_2(multiplication_two(fdic, thdic)),
                # 增加阈值 web
                "thdu_threshold": THDU_THRESHOLD,  # 电压总谐波畸变
                "bl_threshold": BL_THRESHOLD,  # 三相不平衡
                # 增加阈值 小程序
                "thdi_threshold": THDI_THRESHOLD,  # 电流总谐波畸变
                "u_threshold": U_THRESHOLD,  # 电压偏差
            }
        else:
            res_dic = {
                "name": m_name,
                "point_id": point_id,
                "ctnum": ctnum,
                "real_time": "",
                # 电流/电压谐波畸变率
318
                "thdia": "", "thdib": "", "thdic": "",
lcn's avatar
lcn committed
319

320 321
                "thdua": "", "thdub": "", "thduc": "",
                "thduab": "", "thducb": "",
lcn's avatar
lcn committed
322
                # 基波电流
323
                "fdia": "", "fdib": "", "fdic": "",
lcn's avatar
lcn committed
324 325 326 327
                # 三相不平衡
                "ubl": "",
                "ibl": "",
                # 电压偏差
328 329
                "ua_dev": "", "ub_dev": "", "uc_dev": "",
                "uab_dev": "", "ucb_dev": "",
lcn's avatar
lcn committed
330 331 332 333 334
                # 电流总谐波有效值 = 基波电流 * 电流总谐波畸变率
                "thdia_virtual": "",
                "thdib_virtual": "",
                "thdic_virtual": "",
            }
lcn's avatar
lcn committed
335

lcn's avatar
lcn committed
336
        ret_data[m_type].append(res_dic)
337 338 339 340 341 342
    return QclResp(inline=ret_data["inline"],
                   transformer=ret_data["transformer"],
                   feeder=ret_data["feeder"],
                   power_dist=ret_data["power_dist"],
                   device=ret_data["device"],
                   )
lcn's avatar
lcn committed
343 344


ZZH's avatar
ZZH committed
345
async def elec_index_service(cid, point_id, start, end):
ZZH's avatar
ZZH committed
346
    ctnum = await load_point_ctnum(point_id)
lcn's avatar
lcn committed
347 348
    ctnum = ctnum if ctnum == 2 else 3
    now = str(datetime.now())
lcn's avatar
lcn committed
349
    if start[:10] == now[:10] and end[:10] == now[:10]:
lcn's avatar
lcn committed
350 351
        table_name = "point_15min_electric"
        redi_table_name = "location_15min_aiao"
lcn's avatar
lcn committed
352 353 354
    else:
        table_name = "point_1day_electric"
        redi_table_name = "location_1day_aiao"
lcn's avatar
lcn committed
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
    if ctnum == 2:
        common_items = ["lf_mean", "lf_min", "lf_max", "pttl_mean", "pttl_min",
                        "pttl_max", "qttl_mean", "qttl_min", "qttl_max",
                        "costtl_mean", "costtl_min", "costtl_max", "uab_mean",
                        "uab_min", "uab_max", "ucb_mean", "ucb_min", "ucb_max",
                        "ia_mean", "ia_min", "ia_max", "ic_mean", "ic_min",
                        "ic_max", "freq_mean", "freq_min", "freq_max"]
        elec_qual_items = ["ubl_mean", "ubl_min", "ubl_max", "ibl_mean",
                           "ibl_min", "ibl_max", "thduab_mean", "thduab_min",
                           "thduab_max", "thducb_mean", "thducb_min",
                           "thducb_max", "thdia_mean", "thdia_min",
                           "thdia_max", "thdic_mean", "thdic_min", "thdic_max",
                           "uab_dev_mean", "uab_dev_min", "uab_dev_max",
                           "freq_dev_mean", "freq_dev_min", "freq_dev_max", ]
    else:
        common_items = ["lf_mean", "lf_min", "lf_max", "pttl_mean", "pttl_min",
                        "pttl_max", "qttl_mean", "qttl_min", "qttl_max",
                        "costtl_mean", "costtl_min", "costtl_max", "ua_mean",
                        "ua_min", "ua_max", "ub_mean", "ub_min", "ub_max",
                        "uc_mean", "uc_min", "uc_max", "ia_mean", "ia_min",
                        "ia_max", "ib_mean", "ib_min", "ib_max", "ic_mean",
                        "ic_min", "ic_max", "freq_mean", "freq_min",
                        "freq_max"]
        elec_qual_items = ["ubl_mean", "ubl_min", "ubl_max", "ibl_mean",
                           "ibl_min", "ibl_max", "thdua_mean", "thdua_min",
                           "thdua_max", "thdub_mean", "thdub_min", "thdub_max",
                           "thduc_mean", "thduc_min", "thduc_max",
                           "thdia_mean", "thdia_min", "thdia_max",
                           "thdib_mean", "thdib_min", "thdib_max",
                           "thdic_mean", "thdic_min", "thdic_max",
                           "ua_dev_mean", "ua_dev_min", "ua_dev_max",
                           "freq_dev_mean", "freq_dev_min", "freq_dev_max"]
    datas = await get_electric_datas_dao(table_name, point_id, start, end)
lcn's avatar
lcn committed
388 389 390 391 392
    # if not datas:
    #     return ElecIndexResponse(
    #         ctnum=ctnum, common_indexes=[],
    #         elec_qual_indexes=[]
    #     )
lcn's avatar
lcn committed
393 394 395 396 397
    df = pd.DataFrame(list(datas))
    # 常规参数统计
    common_indexes = []
    _common_items = {i.rsplit("_", 1)[0] for i in common_items}
    for item in _common_items:
lcn's avatar
lcn committed
398 399 400 401 402
        if datas:
            max_item_name = f"{item}_max"
            max_value = df[max_item_name].max()
            if not pd.isna(max_value):
                max_datas = df.loc[df[max_item_name].idxmax()].to_dict()
lcn's avatar
lcn committed
403
                max_time = max_datas.get(f"{item}_max_time")
lcn's avatar
lcn committed
404 405 406 407 408 409 410
                max_time = '' if pd.isnull(max_time) else str(max_time)
            else:
                max_value, max_time = "", ""
            min_item_name = f"{item}_min"
            min_value = df[min_item_name].min()
            if not pd.isna(min_value):
                min_datas = df.loc[df[min_item_name].idxmin()].to_dict()
lcn's avatar
lcn committed
411
                min_time = min_datas.get(f"{item}_min_time")
lcn's avatar
lcn committed
412 413 414 415 416 417 418 419 420 421
                min_time = '' if pd.isnull(min_time) else str(min_time)
            else:
                min_value, min_time = "", ""
            mean_item_name = f"{item}_mean"
            avg_value = df[mean_item_name].mean()
            if not pd.isna(avg_value):
                avg_value = round(avg_value, 2) if avg_value else ""
            else:
                avg_value = ""
            elec_index = ElecIndex(
lcn's avatar
lcn committed
422
                stats_index=item,
lcn's avatar
lcn committed
423 424 425 426 427 428
                max=max_value,
                max_time=max_time or "",
                min=min_value,
                min_time=min_time or "",
                avg=avg_value,
            )
lcn's avatar
lcn committed
429
        else:
lcn's avatar
lcn committed
430
            elec_index = ElecIndex(
lcn's avatar
lcn committed
431
                stats_index=item,
lcn's avatar
lcn committed
432 433 434 435 436 437
                max="",
                max_time="",
                min="",
                min_time="",
                avg="",
            )
lcn's avatar
lcn committed
438 439 440 441 442
        common_indexes.append(elec_index)
    # 电能质量统计
    elec_qual_indexes = []
    _elec_qual_items = {i.rsplit("_", 1)[0] for i in elec_qual_items}
    for item in _elec_qual_items:
lcn's avatar
lcn committed
443 444 445 446 447
        if datas:
            max_item_name = f"{item}_max"
            max_value = df[max_item_name].max()
            if not pd.isna(max_value):
                max_datas = df.loc[df[max_item_name].idxmax()].to_dict()
lcn's avatar
lcn committed
448
                max_time = max_datas.get(f"{item}_max_time")
lcn's avatar
lcn committed
449 450 451 452 453 454 455
                max_time = '' if pd.isnull(max_time) else str(max_time)
            else:
                max_value, max_time = "", ""
            min_item_name = f"{item}_min"
            min_value = df[min_item_name].min()
            if not pd.isna(min_value):
                min_datas = df.loc[df[min_item_name].idxmin()].to_dict()
lcn's avatar
lcn committed
456
                min_time = min_datas.get(f"{item}_min_time")
lcn's avatar
lcn committed
457 458 459 460 461 462
                min_time = '' if pd.isnull(min_time) else str(min_time)
            else:
                min_value, min_time = "", ""
            mean_item_name = f"{item}_mean"
            avg_value = df[mean_item_name].mean()
            if not pd.isna(avg_value):
lcn's avatar
lcn committed
463
                avg_value = round(avg_value, 4) if avg_value else ""
lcn's avatar
lcn committed
464 465 466
            else:
                avg_value = ""
            elec_index = ElecIndex(
lcn's avatar
lcn committed
467
                stats_index=item,
lcn's avatar
lcn committed
468 469 470 471 472 473
                max=max_value,
                max_time=max_time,
                min=min_value,
                min_time=min_time,
                avg=avg_value,
            )
lcn's avatar
lcn committed
474
        else:
lcn's avatar
lcn committed
475
            elec_index = ElecIndex(
lcn's avatar
lcn committed
476
                stats_index=item,
lcn's avatar
lcn committed
477 478 479 480 481 482
                max="",
                max_time="",
                min="",
                min_time="",
                avg="",
            )
lcn's avatar
lcn committed
483 484 485
        elec_qual_indexes.append(elec_index)
    # 小程序需要这漏电流和温度
    if cid:
ZZH's avatar
ZZH committed
486 487
        location_datas = await location_stats_statics(redi_table_name, cid,
                                                      start, end)
lcn's avatar
lcn committed
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
        if location_datas:
            for key, value in location_datas.items():
                if value["ad_type"] == "residual_current":
                    name = "漏电流(mA)"
                else:
                    name = f"{value.get('item')}温度(℃)"
                common_indexes.append(
                    ElecIndex(
                        stats_index=value["ad_type"],
                        name=name,
                        max=value.get("max_value") or '',
                        max_time=value.get("max_value_time") or '',
                        min=value.get("min_value") or '',
                        min_time=value.get("min_value_time") or '',
                        avg=value.get("mean_value") or '',
                    )
                )
lcn's avatar
lcn committed
505 506 507 508
    return ElecIndexResponse(
        ctnum=ctnum, common_indexes=common_indexes,
        elec_qual_indexes=elec_qual_indexes
    )
lcn's avatar
lcn committed
509

lcn's avatar
lcn committed
510

ZZH's avatar
ZZH committed
511
async def elec_current_service(point_id):
lcn's avatar
lcn committed
512
    #  获取mtid
513 514 515 516
    p_info = await get_meter_by_point(point_id)
    if not p_info or not p_info["mtid"]:
        msg = f"没有监测点:{point_id} monitor信息,请联系运维人员!"
        raise BusinessException(message=msg)
lcn's avatar
lcn committed
517

518
    now_ts = pendulum.now(tz=CST).int_timestamp
519
    d_rt_ele, ts = None, now_ts
520 521 522 523 524 525 526 527 528 529 530 531
    try:
        mtid = p_info["mtid"]
        key = f"real_time:electric:{SETTING.mysql_db}:{mtid}"
        key_hr = f"real_time:electric_hr:{SETTING.mysql_db}:{mtid}"
        rt_ele, rt_ele_hr = await RedisUtils().mget([key, key_hr])
        if rt_ele and rt_ele_hr:
            rt_ele, rt_ele_hr = json.loads(rt_ele), json.loads(rt_ele_hr)
            if now_ts - rt_ele["ts"] <= REAL_EXP_TIME:
                if now_ts - rt_ele_hr["ts"] <= REAL_EXP_TIME:
                    for k in rt_ele_hr.keys():
                        if k not in rt_ele.keys():
                            rt_ele[k] = rt_ele_hr[k]
lcn's avatar
lcn committed
532

533
                d_rt_ele, ts = rt_ele, rt_ele["ts"]
lcn's avatar
lcn committed
534

535 536 537
        elif rt_ele:
            rt_ele = json.loads(rt_ele)
            if now_ts - rt_ele["ts"] <= REAL_EXP_TIME:
538
                d_rt_ele, ts = rt_ele, rt_ele["ts"]
lcn's avatar
lcn committed
539

540 541 542
        elif rt_ele_hr:
            rt_ele_hr = json.loads(rt_ele_hr)
            if now_ts - rt_ele_hr["ts"] <= REAL_EXP_TIME:
543
                d_rt_ele, ts = rt_ele_hr, rt_ele_hr["ts"]
lcn's avatar
lcn committed
544

545 546 547
    except Exception as e:
        log.error(f"parse real time electric error, pid:{point_id}")
        log.exception(e)
lcn's avatar
lcn committed
548

549
    time_str = timestamp2dts(ts, YMD_Hms)
550
    if d_rt_ele is None:
551
        return time_str, None
lcn's avatar
lcn committed
552

553
    # 识电U只有一项有数据,返回具体的项
554 555 556 557 558 559 560 561 562 563
    ctnum = d_rt_ele.get("ctnum") or 3
    return time_str, get_sdu_i_and_u(d_rt_ele, ctnum)


async def batch_load_rt_ele(mtids):
    now_ts = pendulum.now(tz=CST).int_timestamp
    d_rt_ele = {mtid: {} for mtid in mtids}
    try:
        db = SETTING.mysql_db
        keys = [f"real_time:electric:{db}:{mtid}" for mtid in mtids]
lcn's avatar
lcn committed
564

565 566 567 568 569 570
        i_size = 500
        lst_rt_ele = []
        for i in range(math.ceil(len(mtids) / i_size)):
            pipe = await RedisUtils().client.pipeline()
            for key in keys[i_size * i:i_size * (i + 1)]:
                await pipe.get(key)
lcn's avatar
lcn committed
571

572
            lst_rt_ele += await pipe.execute()
lcn's avatar
lcn committed
573

574
        for rt_ele in lst_rt_ele:
575 576
            if rt_ele is None:
                continue
lcn's avatar
lcn committed
577

578 579 580 581 582 583
            rt_ele = json.loads(rt_ele)
            if now_ts - rt_ele["ts"] <= REAL_EXP_TIME:
                d_rt_ele[rt_ele["mtid"]] = rt_ele
    except Exception as e:
        log.error(f"batch load real time electric error, mtids:{mtids}")
        log.exception(e)
lcn's avatar
lcn committed
584

585 586 587 588 589 590 591 592 593 594
    return d_rt_ele


async def batch_load_rt_ele_with_hr(mtids):
    now_ts = pendulum.now(tz=CST).int_timestamp
    d_rt_ele = {mtid: {} for mtid in mtids}
    try:
        db = SETTING.mysql_db
        keys = [f"real_time:electric:{db}:{mtid}" for mtid in mtids]
        key_hrs = [f"real_time:electric_hr:{db}:{mtid}" for mtid in mtids]
lcn's avatar
lcn committed
595

596 597 598 599 600 601
        i_size = 500
        lst_rt_ele, lst_rt_ele_hr = [], []
        for i in range(math.ceil(len(mtids) / i_size)):
            pipe = await RedisUtils().client.pipeline()
            for key in keys[i_size * i:i_size * (i + 1)]:
                await pipe.get(key)
lcn's avatar
lcn committed
602

603
            lst_rt_ele += await pipe.execute()
lcn's avatar
lcn committed
604

605 606 607
            pipe = await RedisUtils().client.pipeline()
            for key in key_hrs[i_size * i:i_size * (i + 1)]:
                await pipe.get(key)
lcn's avatar
lcn committed
608

609
            lst_rt_ele_hr += await pipe.execute()
lcn's avatar
lcn committed
610

611 612 613 614 615 616 617
        for i, mtid in enumerate(mtids):
            rt_ele, rt_ele_hr = lst_rt_ele[i], lst_rt_ele_hr[i]
            if rt_ele and rt_ele_hr:
                rt_ele, rt_ele_hr = json.loads(rt_ele), json.loads(rt_ele_hr)
                if rt_ele["mtid"] != mtid or rt_ele_hr["mtid"] != mtid:
                    log.error(f"batch_load_rt_ele error, mtid:{mtid}")
                    continue
lcn's avatar
lcn committed
618

619 620 621 622 623
                if now_ts - rt_ele["ts"] <= REAL_EXP_TIME:
                    if now_ts - rt_ele_hr["ts"] <= REAL_EXP_TIME:
                        for k in rt_ele_hr.keys():
                            if k not in rt_ele.keys():
                                rt_ele[k] = rt_ele_hr[k]
lcn's avatar
lcn committed
624

625
                    d_rt_ele[mtid] = rt_ele
lcn's avatar
lcn committed
626

627 628 629 630 631
            elif rt_ele:
                rt_ele = json.loads(rt_ele)
                if rt_ele["mtid"] != mtid:
                    log.error(f"load_rt_ele error, mtid:{mtid}")
                    continue
lcn's avatar
lcn committed
632

633 634
                if now_ts - rt_ele["ts"] <= REAL_EXP_TIME:
                    d_rt_ele[mtid] = rt_ele
lcn's avatar
lcn committed
635

636 637 638 639 640
            elif rt_ele_hr:
                rt_ele_hr = json.loads(rt_ele_hr)
                if rt_ele_hr["mtid"] != mtid:
                    log.error(f"load_rt_ele_hr error, mtid:{mtid}")
                    continue
lcn's avatar
lcn committed
641

642 643 644 645 646
                if now_ts - rt_ele_hr["ts"] <= REAL_EXP_TIME:
                    d_rt_ele[mtid] = rt_ele_hr
    except Exception as e:
        log.error(f"batch load real time electric error, mtids:{mtids}")
        log.exception(e)
lcn's avatar
lcn committed
647

648
    return d_rt_ele
lcn's avatar
lcn committed
649 650 651


def get_sdu_i_and_u(res, ctnum):
652
    """获取识电U的相序字段"""
lcn's avatar
lcn committed
653 654
    res["sdu_i"] = None
    res["sdu_u"] = None
655 656
    meter_sn = res.get("meter_sn", "").lower()
    if meter_sn == "a":
lcn's avatar
lcn committed
657 658
        res["sdu_i"] = "ia"
        res["sdu_u"] = "ua" if ctnum == 3 else "uab"
659
    if meter_sn == "b":
lcn's avatar
lcn committed
660 661 662
        res["sdu_i"] = "ib"
        if ctnum == 3:
            res["sdu_u"] = "ub"
663
    if meter_sn == "c":
lcn's avatar
lcn committed
664 665 666
        res["sdu_i"] = "ic"
        res["sdu_u"] = "uc" if ctnum == 3 else "ucb"
    return res