1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import json
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
async def get_scope_config_by_pid(pid):
'''
获取某个设备的录波配置
'''
async with MysqlUtil() as conn:
sql = "select threshold from scope_config_record where pid = %s"
result = await conn.fetchone(sql=sql, args=(pid,))
return result.get('threshold', {}) if result else {}
async def set_scope_config_by_pid(pid, config):
'''
设置录波配置
'''
try:
async with MysqlUtil() as conn:
sql = "update scope_config_record set threshold=%s where pid=%s"
result = await conn.execute(sql=sql, args=(config, pid))
except Exception as e:
log.error('set_scope_config_by_pid update error:' + str(e))
return -1
return result
async def add_scope_config_by_pid(pid, configs):
'''
增加录波配置
'''
try:
async with MysqlUtil() as conn:
sql = "INSERT INTO `power_iot`.`scope_config_record` (`pid`, " \
"`coef`, `threshold`, `vol_cur`) VALUES (%s, %s, %s, %s)"
result = await conn.execute(sql=sql, args=(pid,
json.dumps(
configs["coef"]),
json.dumps(configs[
"threshold"]),
json.dumps(configs)))
except Exception as e:
log.error('add_scope_config_by_pid add error:' + str(e))
return -1
return result
async def get_scope_list_by_pid(pids, start_dt, end_dt, scope_g="2s"):
"""
ES查询2s录波记录数据
"""
query_body = {
"size": 10000,
"query": {
"bool": {
"must": [
{"terms": {"point_id": pids}},
{"term": {"scope_g": {"value": scope_g}}},
{"range": {"datetime": {"gte": start_dt, "lt": end_dt}}}
]
}
},
"sort": [{"datetime": {"order": "asc"}}]
}
async with EsUtil() as es:
datas = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_SCOPE)
return datas["hits"]["hits"]