proxy_optimization.py 6.9 KB
Newer Older
lcn's avatar
lcn committed

from dataclasses import fields

from pot_libs.sanic_api import summary
from unify_api.modules.electric_optimization.components.proxy_optimization_cps import (
    ProxyOpReq,
    ProxyPowerFactorListResp,
    ProxyPowerFactorPageItem,
    ProxyPowerFactorSummarryResp,
    ProxyMdSpaceListResp,
    ProxyMdSpacePageItem,
    ProxyMdSpaceSummarryResp,
    ProxyPowerSaveListResp,
    ProxyPowerSaveItem,
    ProxyPowerSaveSummaryResp,
    ProxyPcvfListResp,
    ProxyPcvfPageItem,
    ProxyElectricOpSummaryResp,
    OptimizationInfo,
)
from unify_api.modules.electric_optimization.procedures.proxy_optimization_pds import (
    proxy_power_factor_list,
    proxy_power_factor_summary,
    proxy_md_space_list,
    proxy_md_space_summary,
    proxy_power_save_list,
    proxy_power_save_summary,
    proxy_pcvf_list,
    proxy_pcvf_summary,
    proxy_electric_optimization_summary,
)


@summary("知电U管理版本用电优化-功率因素-列表数据")
async def post_power_factor_list(req, body: ProxyOpReq) -> ProxyPowerFactorListResp:
    cids, month_str = body.cids, body.month
    page_size, page_num = body.page_size or 10, body.page_num or 1
    sort_field, sort_direction = (
        body.sort_field or "company_name",
        body.sort_direction or "asc",
    )
    status_list = body.status
    if len(month_str) == 7:
        month_str = f"{month_str}-01"
    pf_page_map = await proxy_power_factor_list(
        cids,
        month_str,
        page_size=page_size,
        page_num=page_num,
        sort_field=sort_field,
        sort_direction=sort_direction,
        status_list=status_list,
    )

    return ProxyPowerFactorListResp(
        rows=[ProxyPowerFactorPageItem(**i) for i in pf_page_map["rows"]],
        total=pf_page_map["total"],
    )


@summary("知电U管理版本用电优化-功率因素-统计数据")
async def post_power_factor_summary(req, body: ProxyOpReq) -> ProxyPowerFactorSummarryResp:
    cids, month_str = body.cids, body.month
    if len(month_str) == 7:
        month_str = f"{month_str}-01"
    pf_summary_map = await proxy_power_factor_summary(cids, month_str)
    return ProxyPowerFactorSummarryResp(
        **{
            k: v
            for k, v in pf_summary_map.items()
            if k in [field.name for field in fields(ProxyPowerFactorSummarryResp)]
        }
    )


@summary("知电U管理版本用电优化-需量管理-列表数据")
async def post_md_space_list(req, body: ProxyOpReq) -> ProxyMdSpaceListResp:
    cids, month_str = body.cids, body.month
    page_size, page_num = body.page_size or 10, body.page_num or 1
    sort_field, sort_direction = (
        body.sort_field or "company_name",
        body.sort_direction or "asc",
    )
    status_list = body.status
    if len(month_str) == 7:
        month_str = f"{month_str}-01"
    md_space_map = await proxy_md_space_list(
        cids,
        month_str,
        page_size=page_size,
        page_num=page_num,
        sort_field=sort_field,
        sort_direction=sort_direction,
        status_list=status_list,
    )

    return ProxyMdSpaceListResp(
        rows=[ProxyMdSpacePageItem(**i) for i in md_space_map["rows"]], total=md_space_map["total"],
    )


@summary("知电U管理版本用电优化-需量管理-统计数据")
async def post_md_space_summary(req, body: ProxyOpReq) -> ProxyMdSpaceSummarryResp:
    cids, month_str = body.cids, body.month
    if len(month_str) == 7:
        month_str = f"{month_str}-01"
    md_summary_map = await proxy_md_space_summary(cids, month_str)
    return ProxyMdSpaceSummarryResp(
        **{
            k: v
            for k, v in md_summary_map.items()
            if k in [field.name for field in fields(ProxyMdSpaceSummarryResp)]
        }
    )


@summary("知电U管理版本用电优化-经济运行-列表数据")
async def post_power_save_list(req, body: ProxyOpReq) -> ProxyPowerSaveListResp:
    cids, month_str = body.cids, body.month
    page_size, page_num = body.page_size or 10, body.page_num or 1
    sort_field, sort_direction = (
        body.sort_field or "company_name",
        body.sort_direction or "asc",
    )
    if len(month_str) == 7:
        month_str = f"{month_str}-01"
    power_save_map = await proxy_power_save_list(
        cids,
        month_str,
        page_size=page_size,
        page_num=page_num,
        sort_field=sort_field,
        sort_direction=sort_direction,
    )

    return ProxyPowerSaveListResp(
        rows=[ProxyPowerSaveItem(**i) for i in power_save_map["rows"]],
        total=power_save_map["total"],
    )


@summary("知电U管理版本用电优化-经济运行-统计数据")
async def post_power_save_summary(req, body: ProxyOpReq) -> ProxyPowerSaveSummaryResp:
    cids, month_str = body.cids, body.month
    if len(month_str) == 7:
        month_str = f"{month_str}-01"
    save_summary_map = await proxy_power_save_summary(cids, month_str)
    return ProxyPowerSaveSummaryResp(
        **{
            k: v
            for k, v in save_summary_map.items()
            if k in [field.name for field in fields(ProxyPowerSaveSummaryResp)]
        }
    )


@summary("知电U管理版本用电优化-移峰填谷-列表数据")
async def post_pcvf_list(req, body: ProxyOpReq) -> ProxyPcvfListResp:
    cids, month_str = body.cids, body.month
    page_size, page_num = body.page_size or 10, body.page_num or 1
    sort_field, sort_direction = (
        body.sort_field or "company_name",
        body.sort_direction or "asc",
    )
    if len(month_str) == 7:
        month_str = f"{month_str}-01"
    pcvf_map = await proxy_pcvf_list(
        cids,
        month_str,
        page_size=page_size,
        page_num=page_num,
        sort_field=sort_field,
        sort_direction=sort_direction,
    )

    return ProxyPcvfListResp(
        rows=[ProxyPcvfPageItem(**i) for i in pcvf_map["rows"]], total=pcvf_map["total"],
    )


@summary("知电U管理版本用电优化-移峰填谷-统计数据")
async def post_pcvf_summary(req, body: ProxyOpReq) -> ProxyPowerSaveSummaryResp:
    cids, month_str = body.cids, body.month
    if len(month_str) == 7:
        month_str = f"{month_str}-01"
    pcvf_summary_map = await proxy_pcvf_summary(cids, month_str)
    return ProxyPowerSaveSummaryResp(
        **{
            k: v
            for k, v in pcvf_summary_map.items()
            if k in [field.name for field in fields(ProxyPowerSaveSummaryResp)]
        }
    )


@summary("知电U管理版本-首页用电优化-统计数据")
async def post_electric_op_summary(req, body: ProxyOpReq) -> \
        ProxyElectricOpSummaryResp:
    cids, month_str = body.cids, body.month
    if len(month_str) == 7:
        month_str = f"{month_str}-01"
    summary_map = await proxy_electric_optimization_summary(cids, month_str)
    return ProxyElectricOpSummaryResp(
        power_factor=OptimizationInfo(**summary_map["power_factor"]),
        md_space=OptimizationInfo(**summary_map["md_space"]),
        pcvf=OptimizationInfo(**summary_map["pcvf"]),
        power_save=OptimizationInfo(**summary_map["power_save"]),
    )