import json import pytest from unify_api.tests.constants_t import TOKEN, HTTP_PREFIX from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils @pytest.mark.parametrize('data', [ { "cid": 78, "start": "2021-07-01 00:00:00", "end": "2021-07-31 23:59:59", "product": 4 }, { "cid": 78, "start": "2021-06-01 00:00:00", "end": "2021-06-30 23:59:59", "product": 4 } ]) @pytest.mark.asyncio async def test_sdu_alarm_statistics(data): """ 识电u->报警统计 # """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static" f"/sdu-alarm-statistics", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert resp_str["data"]["total_alarm_cnt"] == 0 @pytest.mark.parametrize('data', [ { "cids": [78, 113], "start": "2021-06-01 00:00:00", "end": "2021-06-30 23:59:59", "product": 4 }, ]) @pytest.mark.asyncio async def test_sdu_alarm_statistics_wx(data): """ 识电u->报警统计-微信 # """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static" f"/sdu-alarm-statistics-wx", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert resp_str["data"]["total_alarm_cnt"] == 10 @pytest.mark.parametrize('data', [ { "cid": 78, "end": "2021-07-31 23:59:59", "page_num": 1, "page_size": 14, "product": 4, "sort": "desc", "start": "2021-07-01 00:00:00", }, ]) @pytest.mark.asyncio async def test_sdu_alarm_statistics_sort(data): """ 报警统计-报警记录-排名-识电u """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static" f"/sdu-alarm-statistics-sort", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert resp_str["data"]["alarm_ranking_total"] == \ len(resp_str["data"]["alarm_ranking"]) @pytest.mark.parametrize('data', [ { "cid": 44, "start": "2021-06-01 00:00:00", "end": "2021-06-30 23:59:59", "product": 4 }, ]) @pytest.mark.asyncio async def test_sdu_app_statistics_sort(data): """ 报警统计-电器识别-排名-识电u """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static" f"/sdu-app-statistics-sort", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert resp_str["data"]["ele_app_ranking"] == [] @pytest.mark.parametrize('data', [ { "cid": 44, "start": "2021-06-01 00:00:00", "end": "2021-06-30 23:59:59", "product": 4 }, ]) @pytest.mark.asyncio async def test_sdu_index_statistics(data): """ 首页-运行趋势-识电 """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static" f"/sdu-index-statistics", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert len(resp_str["data"]["illegal_ele_app"]["value"]) == 30 @pytest.mark.parametrize('data', [ { "cid": 44, "product": 4 }, ]) @pytest.mark.asyncio async def test_sdu_index_alarm_statistics(data): """ 首页-运行趋势-新版识电u """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static" f"/sdu-index-alarm-statistics", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert resp_str["data"]["risk_distribution"]["security_user"] == 6 @pytest.mark.parametrize('data', [ { "cid": 54, "product": 4 }, ]) @pytest.mark.asyncio async def test_sdu_index_alarm_ranking(data): """ 报警违规排名-新版识电u-近30天用电行为 """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static" f"/sdu-index-alarm-ranking", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert resp_str["data"]["illegal_app"] == [] @pytest.mark.parametrize('data', [ { "cid": 44, "start": "2020-07-30 00:00:00", "end": "2020-07-30 23:59:59", "product": 4 }, ]) @pytest.mark.asyncio async def test_zdu_level_distribution(data): """ 报警统计-报警等级-智电u """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static" f"/zdu-level-distribution", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert resp_str["data"]["level_detail"]["first_alarm_cnt"] == 152 @pytest.mark.parametrize('data', [ { "cid": 44, "start": "2020-07-30 00:00:00", "end": "2020-07-30 23:59:59", "product": 4 }, ]) @pytest.mark.asyncio async def test_zdu_content_distribution(data): """ 报警统计-报警内容-智电u """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static" f"/zdu-content-distribution", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert resp_str["data"]["content_detail"]["temperature_cnt"] == 152 @pytest.mark.parametrize('data', [ { "cid": 44, "start": "2020-07-30 00:00:00", "end": "2020-07-30 23:59:59", "product": 4 }, ]) @pytest.mark.asyncio async def test_zdu_summary(data): """ 报警统计-统计概况信息-智电u """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static/zdu-summary", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert resp_str["data"]["time_interval_distribution"]["daytime_cnt"] == 95 assert resp_str["data"]["time_interval_distribution"]["morning_cnt"] == 57 @pytest.mark.parametrize('data', [ { "cid": 44, "start": "2021-06-01 00:00:00", "end": "2021-06-30 23:59:59", "product": 4, "page_size": 10, "page_num": 1, }, ]) @pytest.mark.asyncio async def test_zdu_alarm_sort(data): """ 报警统计-报警排名-智电u """ resp_str, status = await AioHttpUtils().post( url=f"{HTTP_PREFIX}/unify-api/alarm-manager/alarm-static" f"/zdu-alarm-sort", data=data, timeout=10, headers={"Authorization": f"Bearer {TOKEN}"} ) assert status == 200 resp_str = json.loads(resp_str) assert len(resp_str["data"]["alarm_ranking"]) == resp_str["data"]["total"]