open_data_service.py 9.25 KB
Newer Older
lcn's avatar
lcn committed
1 2 3
import json
import base64
import pendulum
ZZH's avatar
ZZH committed
4 5
from unify_api.modules.electric.dao.electric_dao import \
    get_elec_mtid_sid_by_cid
lcn's avatar
lcn committed
6 7 8 9 10 11 12
from unify_api import constants
from pot_libs.common.components.responses import success_res
from unify_api.modules.shidianu.components.open_data_cps import (
    BasicInfoResp, StbDataResp
)
from unify_api.modules.alarm_manager.components.list_alarm import \
    ListAlarmResponse, Alarm
ZZH's avatar
ZZH committed
13
from unify_api.modules.common.dao.common_dao import load_compy_storey_points
lcn's avatar
lcn committed
14
from unify_api.modules.shidianu.dao.open_data_dao import \
ZZH's avatar
ZZH committed
15
    get_user_product_auth, load_lg_sdu_events, monitor_point_company
lcn's avatar
lcn committed
16 17 18 19
from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
from pot_libs.settings import SETTING
from pot_libs.utils.pendulum_wrapper import my_pendulum
from pot_libs.aredis_util.aredis_utils import RedisUtils
ZZH's avatar
ZZH committed
20
from unify_api.utils.taos_new import get_td_engine_data, parse_td_columns
ZZH's avatar
ZZH committed
21 22 23
from unify_api.utils.time_format import (
    CST, convert_dt_to_timestr, convert_to_dt
)
ZZH's avatar
ZZH committed
24 25 26
from unify_api.modules.electric.service.electric_service import (
    batch_load_rt_ele_with_hr
)
lcn's avatar
lcn committed
27 28


ZZH's avatar
ZZH committed
29
async def basic_info_longgang_service(user_id, pg_size, pg_num):
lcn's avatar
lcn committed
30 31 32 33 34 35
    # 1.验证权限
    cids = [223]
    is_auth = await get_power(user_id, cids)
    if not is_auth:
        return success_res(code=4001, msg="您没有权限访问")

ZZH's avatar
ZZH committed
36 37 38 39
    rows = []
    storey_points = await load_compy_storey_points(cids, 1, 3000)
    total = len(storey_points)
    if not storey_points:
lcn's avatar
lcn committed
40
        return success_res(code=4009, msg="此厂还未装任何装置")
ZZH's avatar
ZZH committed
41 42 43

    page_start = (pg_num - 1) * pg_size
    page_end = (pg_num - 1) * pg_size + pg_size
lcn's avatar
lcn committed
44 45
    if page_start > total:
        return BasicInfoResp(rows=[], total=total)
ZZH's avatar
ZZH committed
46 47

    for info in storey_points[page_start:page_end]:
lcn's avatar
lcn committed
48 49
        room_name = info["room_name"] or ''
        storey_name = info["storey_name"] or ''
ZZH's avatar
ZZH committed
50
        compy_addr = info["address"] or ''
lcn's avatar
lcn committed
51 52
        rows.append(
            {
ZZH's avatar
ZZH committed
53
                "cid": info["cid"],
lcn's avatar
lcn committed
54 55
                "pid": info["pid"],
                "mtid": info["mtid"],
ZZH's avatar
ZZH committed
56 57 58
                "storey_id": info.get("storey_id", ""),
                "meter_no": info["meter_no"] + "相",
                "equipment_address": f"{compy_addr}{storey_name}{room_name}",
lcn's avatar
lcn committed
59
                "equipment_code": info["sid"],
ZZH's avatar
ZZH committed
60
                "root_address": f"{compy_addr}{storey_name}{room_name}",
ZZH's avatar
ZZH committed
61 62
                "longitude": "",
                "latitude": "",
lcn's avatar
lcn committed
63 64 65 66 67 68 69
                "insurer": "黄锦成",
                "insurer_phone": "13580428480"
            }
        )
    return BasicInfoResp(rows=rows, total=total)


ZZH's avatar
ZZH committed
70 71 72 73
async def stb_data_longgang_service(user_id, d_type):
    topic2db = {"electric": "db_electric", "soe": "db_soe",
                "appliance": "db_appliance"}
    if d_type not in topic2db.keys():
lcn's avatar
lcn committed
74
        return success_res(code=4002, msg="type错误")
ZZH's avatar
ZZH committed
75 76 77

    access_lim_key = f"access_data_{d_type}:{user_id}"
    if await RedisUtils().get(access_lim_key):
ZZH's avatar
ZZH committed
78
        return success_res(code=4011, msg="访问频繁,请60s后访问")
ZZH's avatar
ZZH committed
79

lcn's avatar
lcn committed
80
    cids = [223]
ZZH's avatar
ZZH committed
81
    if not await get_power(user_id, cids):
lcn's avatar
lcn committed
82 83
        return success_res(code=4001, msg="您没有权限访问")

ZZH's avatar
ZZH committed
84
    if d_type == "appliance":
lcn's avatar
lcn committed
85
        return success_res(code=4003, msg="未查找到数据")
ZZH's avatar
ZZH committed
86

ZZH's avatar
ZZH committed
87 88 89 90 91
    elif d_type == "electric":
        lst_mtid_sid = await get_elec_mtid_sid_by_cid(cids[0])
        mtids = [mtid_sid["mtid"] for mtid_sid in lst_mtid_sid]
        d_rt_ele = await batch_load_rt_ele_with_hr(mtids)
        datas = list(d_rt_ele.values())
lcn's avatar
lcn committed
92
    else:
ZZH's avatar
ZZH committed
93 94 95 96 97 98 99 100 101 102
        db_name = topic2db[d_type]
        stb_url = f"{SETTING.stb_url}{db_name}?tz=Asia/Shanghai"
        sql = f"select last_row(*) from {db_name}.{d_type}_stb " \
              f"where cpyid={cids[0]} group by tbname;"
        is_succ, results = await get_td_engine_data(stb_url, sql)
        if not is_succ:
            return success_res(code=4003, msg="未查找到数据")

        head = parse_td_columns(results)
        datas = [dict(zip(head, r)) for r in results["data"]]
lcn's avatar
lcn committed
103
        for data in datas:
ZZH's avatar
ZZH committed
104
            data["ts"] = data.pop("ts_origin")
ZZH's avatar
ZZH committed
105

ZZH's avatar
ZZH committed
106
    await RedisUtils().setex(access_lim_key, 60, 1)
ZZH's avatar
ZZH committed
107
    return StbDataResp(rows=datas, total=len(datas))
lcn's avatar
lcn committed
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146


async def get_power(user_id, cids):
    user_info = await get_user_product_auth(user_id)
    if isinstance(cids, list):
        if user_info and user_info.get("cid_ext"):
            cid_ext = json.loads(user_info["cid_ext"])
            for cid in cids:
                user_cid_power = cid_ext.get(str(cid), "")
                if not user_cid_power or (4 not in user_cid_power):
                    return False
            return True
        return False
    elif isinstance(cids, int):
        if user_info and user_info.get("cid_ext"):
            cid_ext = json.loads(user_info["cid_ext"])
            user_cid_power = cid_ext.get(str(cids), "")
            if user_cid_power and (4 in user_cid_power):
                return True
        return False


def get_token():
    user_password = f"{SETTING.td_user}:{SETTING.td_pwd}"
    token = base64.b64encode(user_password.encode()).decode()
    return token


async def get_requests(token, stb_url, sql):
    resp_str, status = await AioHttpUtils().post(
        stb_url, data=sql, timeout=50,
        headers={"Authorization": f"Basic {token}"}
    )
    if status == 200:
        return resp_str
    else:
        return False


ZZH's avatar
ZZH committed
147 148
async def supplement_data_service(user_id, cid, start, end, d_type):
    if not await get_power(user_id, cid):
lcn's avatar
lcn committed
149
        return success_res(code=4001, msg="您没有权限访问")
ZZH's avatar
ZZH committed
150 151 152 153

    topic2db = {"electric": "db_electric", "soe": "db_soe",
                "appliance": "db_appliance"}
    if d_type not in topic2db.keys():
lcn's avatar
lcn committed
154
        return success_res(code=4002, msg="type错误")
ZZH's avatar
ZZH committed
155
    now = pendulum.now(tz=CST)
lcn's avatar
lcn committed
156 157 158
    try:
        start = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss')
        end = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss')
ZZH's avatar
ZZH committed
159
        if (end - start).in_hours() > 6:
lcn's avatar
lcn committed
160
            return success_res(code=4003, msg="时间跨度不能大于6小时")
ZZH's avatar
ZZH committed
161 162

        if (now - start).in_months() > 1:
lcn's avatar
lcn committed
163 164 165
            return success_res(code=4010, msg="查询日期不能大于1个月")
    except Exception as e:
        success_res(code=4004, msg="开始时间或者结束时间错误")
ZZH's avatar
ZZH committed
166 167 168 169

    db_name = topic2db[d_type]
    stb_url = f"{SETTING.stb_url}{db_name}?tz=Asia/Shanghai"
    if d_type == "electric":
ZZH's avatar
ZZH committed
170
        sql = f"select * from {d_type}_stb where cpyid={cid} and " \
lcn's avatar
lcn committed
171 172
              f"ts >= '{start}' and ts <= '{end}'"
    else:
ZZH's avatar
ZZH committed
173
        sql = f"select * from {d_type}_stb where cpyid={cid} and " \
lcn's avatar
lcn committed
174
              f"ts_origin >= '{start}' and ts_origin <= '{end}'"
ZZH's avatar
ZZH committed
175 176
    is_succ, results = await get_td_engine_data(stb_url, sql)
    if not is_succ:
lcn's avatar
lcn committed
177
        return success_res(code=4003, msg="未查找到数据")
ZZH's avatar
ZZH committed
178

ZZH's avatar
ZZH committed
179 180
    head = parse_td_columns(results)
    datas = [dict(zip(head, r)) for r in results["data"]]
ZZH's avatar
ZZH committed
181
    if d_type == "electric":
lcn's avatar
lcn committed
182 183 184 185 186 187 188 189 190 191
        datas = [data.pop("ts_received") for data in datas]
    else:
        for data in datas:
            data["ts"] = data["ts_origin"]
            data.pop("ts_origin")
    total = results.get("rows", 0) or 0
    return StbDataResp(rows=datas, total=total)


# 告警结果和分析结果
ZZH's avatar
ZZH committed
192 193
async def result_longgang_service(user_id, importance, pg_size, pg_num):
    cid = 223
lcn's avatar
lcn committed
194 195 196 197
    cids = [223]
    is_auth = await get_power(user_id, cids)
    if not is_auth:
        return success_res(code=4001, msg="您没有权限访问")
ZZH's avatar
ZZH committed
198 199 200

    total, events = await load_lg_sdu_events(cid, pg_num, pg_size, importance)
    if not events:
lcn's avatar
lcn committed
201
        return ListAlarmResponse(total=0, rows=[])
ZZH's avatar
ZZH committed
202

lcn's avatar
lcn committed
203 204 205 206 207 208 209 210
    # 2. 构建返回数据
    monitor_point_list = await monitor_point_company(cids)
    mp_map = {}
    for mp_list in monitor_point_list:
        if mp_list["pid"]:
            mp_map[mp_list["pid"]] = {
                "sid": mp_list["sid"], "mtid": mp_list["mtid"],
                "storey_name": mp_list["storey_name"],
wang.wenrong's avatar
wang.wenrong committed
211
                "room_name": mp_list["room_name"],
lcn's avatar
lcn committed
212 213 214 215
                "fullname": mp_list["fullname"],
            }

    rows = []
ZZH's avatar
ZZH committed
216 217
    for event in events:
        point_id = event.get("pid")
lcn's avatar
lcn committed
218
        if point_id in mp_map.keys():
ZZH's avatar
ZZH committed
219
            type = event.get("event_type")
lcn's avatar
lcn committed
220
            type_str = constants.SDU_EVENT_TYPE_MAP.get(type, type)
ZZH's avatar
ZZH committed
221
            point_id = event.get("pid")
lcn's avatar
lcn committed
222
            sid = mp_map.get(point_id).get("sid")
ZZH's avatar
ZZH committed
223 224 225 226
            date_time = event.get("event_datetime")
            dt = convert_to_dt(date_time)
            date_time = convert_dt_to_timestr(dt)
            event_duration = event.get("event_duration")
lcn's avatar
lcn committed
227 228 229 230
            storey_name = mp_map.get(point_id).get("storey_name")
            room_name = mp_map.get(point_id).get("room_name")
            company_name = mp_map.get(point_id).get("fullname")
            alarm = Alarm(
ZZH's avatar
ZZH committed
231 232 233
                es_id=event["id"],
                name=event.get("name"),
                importance=event.get("importance"),
lcn's avatar
lcn committed
234 235 236
                date_time=date_time,
                type=type,
                type_name=type_str,
ZZH's avatar
ZZH committed
237
                description=event.get("message"),
lcn's avatar
lcn committed
238 239 240 241 242 243 244
                event_duration=event_duration,
                company_name=company_name,
                point_id=point_id,
                storey_name=storey_name,
                room_name=room_name,
                storey_room_name=storey_name + room_name,
                sid=sid,
ZZH's avatar
ZZH committed
245
                mtid=event.get("mtid")
lcn's avatar
lcn committed
246 247 248 249
            )
            rows.append(alarm)

    return ListAlarmResponse(total=total, rows=rows)