elec_charge_pds.py 6.53 KB
Newer Older
lcn's avatar
lcn committed
1
from pot_libs.logger import log
lcn's avatar
lcn committed
2
from pot_libs.mysql_util.mysql_util import MysqlUtil
lcn's avatar
lcn committed
3 4
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.modules.common.dao.common_dao import company_by_cids
ZZH's avatar
ZZH committed
5
from unify_api.modules.elec_charge.dao.elec_charge_dao import load_cmpy_charge
lcn's avatar
lcn committed
6
from unify_api.utils.common_utils import round_2, process_es_data, round_4
ZZH's avatar
ZZH committed
7
from unify_api.utils.time_format import last_time_str
lcn's avatar
lcn committed
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
from functools import reduce


def quarters_trans(quarters):
    """分时电价转换工具"""
    dic2 = {
        "sdt": 0,
        "pdt": 0,
        "fdt": 0,
        "vdt": 0,
        "s": [],
        "p": [],
        "f": [],
        "v": []
    }
    temp_value = ""
    temp_index = 0
ZZH's avatar
ZZH committed
25

lcn's avatar
lcn committed
26
    start = my_pendulum.from_format("00:00", 'HH:mm')
ZZH's avatar
ZZH committed
27

lcn's avatar
lcn committed
28 29 30 31 32 33 34 35
    for index, value in enumerate(quarters):
        if index == 0:
            temp_value = value
        if value != temp_value:  # 有切换
            # 切换前元素的索引范围
            # dic2[temp_value].append(str(temp_index) + "-" + str(index-1))
            # 计算时长
            dic2[temp_value + "dt"] += (index - temp_index) * 15 / 60
ZZH's avatar
ZZH committed
36

lcn's avatar
lcn committed
37 38 39 40
            # 转换为时间范围
            minute_s = start.add(minutes=temp_index * 15).format("HH:mm")
            minute_e = start.add(minutes=index * 15).format("HH:mm")
            dic2[temp_value].append(str(minute_s) + "-" + str(minute_e))
ZZH's avatar
ZZH committed
41

lcn's avatar
lcn committed
42 43 44 45 46 47 48 49 50 51 52 53
            # 重置temp_value和temp_index
            temp_value = value
            temp_index = index
        if index == 95:
            minute_s = start.add(minutes=temp_index * 15).format("HH:mm")
            minute_e = start.add(minutes=(index + 1) * 15).format("HH:mm")
            dic2[temp_value].append(str(minute_s) + "-" + str(minute_e))
            # 计算时长
            dic2[temp_value + "dt"] += (index - temp_index + 1) * 15 / 60
    return dic2


ZZH's avatar
ZZH committed
54
async def load_proxy_power(cid_list):
lcn's avatar
lcn committed
55 56 57 58 59 60 61 62 63 64
    """渠道版累计用电"""
    sql = f"""
        select sum(kwh) kwh  from company_1day_power where cid in %s
    """
    # 3.返回数据
    async with MysqlUtil() as conn:
        res = await conn.fetchone(sql=sql, args=(cid_list,))
    return round(res.get("kwh", 0), 2) if res else 0


lcn's avatar
lcn committed
65
async def power_overview_proxy(date_start, date_end, cid_list):
lcn's avatar
lcn committed
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
    """渠道版, 抽离电量电费信息,供调用"""
    pv1 = {}  # 电量
    pv2 = {}  # 电费
    sql = f"""
        select spfv,sum(p) as p,sum(kwh) as kwh,sum(charge) as charge
        from company_15min_power
        where cid in %s and create_time BETWEEN %s and %s
        group by spfv
    """
    async with MysqlUtil() as conn:
        res = await conn.fetchall(sql=sql,
                                  args=(cid_list, date_start, date_end))
    # 4. 构造返回
    for info in res:
        pv1[info.get("spfv")] = round_2(info.get("kwh"))
        pv2[info.get("spfv")] = round_2(info.get("charge"))
    return pv1, pv2


lcn's avatar
lcn committed
85 86 87 88 89 90 91 92
def total_value(dict_total):
    # spfv是对象
    if not dict_total:
        return "", ""
    return reduce(lambda x, y: x + y, dict_total.values())


async def power_aggs_cid_proxy(start, end, cid_list, date_type):
lcn's avatar
lcn committed
93 94 95 96
    """渠道版,电量电费信息,根据cid聚合,再聚合求出sum电量/电费"""
    # 1. 求出上周期时间
    start_last, end_last = last_time_str(start, end, date_type)
    # 2. 获取es结果
ZZH's avatar
ZZH committed
97 98
    re_this = await load_cmpy_charge(start, end, cid_list)
    re_last = await load_cmpy_charge(start_last, end_last, cid_list)
lcn's avatar
lcn committed
99
    if not re_this:
ZZH's avatar
ZZH committed
100
        log.info(f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}")
lcn's avatar
lcn committed
101 102 103 104 105 106 107 108 109 110 111 112 113
        return [], [], []
    re_last_dic = process_es_data(re_last, key="cid")
    # 3. 构造返回
    kwh_list = []
    charge_list = []
    price_list = []
    # 3.1 查询出cid和工厂名对应关系
    company_list = await company_by_cids(cid_list)
    # 把cid提出来
    com_dic = process_es_data(company_list, key="cid")
    for info in re_this:
        cid = info.get("cid")
        cid_name = com_dic[cid]["shortname"]
ZZH's avatar
ZZH committed
114

lcn's avatar
lcn committed
115 116 117 118 119 120 121 122 123 124 125 126 127 128
        kwh = round_2(info.get("kwh"))
        if kwh == 0:
            continue
        # 上一周期如果没有数据, 此数据不参与统计
        try:
            kwh_last = re_last_dic[cid]["kwh"]
            kwh_rate = round_4((kwh - kwh_last) / kwh_last)
            if kwh_last == 0:
                continue
        except Exception as e:
            log.error(e)
            log.info(
                f"本次有电量数据, 上周期没有电量数据, cid:{cid}, start:{start}, end:{end}")
            continue
ZZH's avatar
ZZH committed
129

lcn's avatar
lcn committed
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
        charge = round_2(info.get("charge"))
        try:
            charge_last = re_last_dic[cid]["charge"]
            charge_rate = round_4((charge - charge_last) / charge_last)
            if charge_last == 0:
                continue
        except Exception as e:
            log.error(e)
            log.info("本次有数据, 上周期没有数据")
            log.info(
                f"本次有电费数据, 上周期没有电费数据, cid:{cid}, start:{start}, end:{end}")
            continue
        price = round_2(charge / kwh)
        price_last = round_2(charge_last / kwh_last)
        price_rate = round_4((price - price_last) / price_last)
        # 构造kwh
        kwh_list.append({"name": cid_name, "value": kwh, "rate": kwh_rate})
        charge_list.append(
            {"name": cid_name, "value": charge, "rate": charge_rate})
        price_list.append(
            {"name": cid_name, "value": price, "rate": price_rate})
    return kwh_list, charge_list, price_list


ZZH's avatar
ZZH committed
154
async def power_index_cid_proxy(start, end, cid_list, date_type):
lcn's avatar
lcn committed
155 156
    """power_aggs_cid_proxy缩减版, 没有增长率"""
    # 1. 获取es结果
ZZH's avatar
ZZH committed
157
    res = await load_cmpy_charge(start, end, cid_list)
lcn's avatar
lcn committed
158
    if not res:
ZZH's avatar
ZZH committed
159
        log.info(f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}")
lcn's avatar
lcn committed
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
        return [], [], []
    # 3. 构造返回
    kwh_list = []
    charge_list = []
    price_list = []
    # 3.1 查询出cid和工厂名对应关系
    company_list = await company_by_cids(cid_list)
    # 把cid提出来
    com_dic = process_es_data(company_list, key="cid")
    for info in res:
        cid = info.get("cid")
        cid_name = com_dic[cid]["shortname"] if cid in com_dic else ''
        kwh = round_2(info.get("kwh"))
        charge = round_2(info.get("charge"))
        # 值为0不参与排名统计
        if kwh == 0:
            continue
        price = round_2(charge / kwh)
ZZH's avatar
ZZH committed
178

lcn's avatar
lcn committed
179 180 181 182 183
        # 构造kwh
        kwh_list.append({"name": cid_name, "value": kwh})
        charge_list.append({"name": cid_name, "value": charge})
        price_list.append({"name": cid_name, "value": price})
    return kwh_list, charge_list, price_list