1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import json
from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.sanic_api import summary
from pot_libs.settings import SETTING
from unify_api.constants import ADIO_KEY, LINK_CONT_EXP_TIME
from unify_api.modules.linkage_control.components.linkage_control_cps import \
LinLocationReq, LinLocationResp, LinkageLocation, SwitchOpReq, \
SwitchOpResp, SwitchRecordReq, SwitchRecordResp, SwitchRecord, SoesResp
from unify_api.modules.linkage_control.service.linkage_control_service import \
switch_operation_emq_service
from unify_api.modules.users.procedures.jwt_user import jwt_user
from unify_api.utils.time_format import srv_time
@summary('状态监测')
async def post_linkage_location(req, body: LinLocationReq) -> LinLocationResp:
"""联动控制,工厂所有开关监测点,group+item"""
cid = body.cid
# 1. 根据cid查出所有(location.group)
sql = "select * from location where cid = %s and ad_type = %s"
async with MysqlUtil() as conn:
loc_info = await conn.fetchall(sql=sql, args=(cid, "switch"))
if not loc_info:
return LinLocationResp(location_info=[])
# 2. 查询用户是否有控制权限
user_id = jwt_user(req)
if not user_id:
return LinLocationResp.missing_jwt()
auth_sql = "select linkage_control from linkage_control_authority " \
"where user_id = %s and cid = %s"
async with MysqlUtil() as conn:
auth_info = await conn.fetchone(sql=auth_sql, args=(user_id, cid))
log.info(f"auth_info:{auth_info}")
try:
auth_info = auth_info["linkage_control"]
except:
# 说明查询不到auth_info记录,auth_info为None报错
auth_info = None
if not auth_info:
authority = 0
else:
authority = 1
# 3. 查询开关状态redis, 拼接好返回list
location_info = []
for info in loc_info:
ll = LinkageLocation()
ll.location_id = info.get("id")
if info.get("item") == "default":
ll.name = info.get("group")
else:
ll.name = info.get("group") + "_" + info.get("item")
# 开关状态
val = await RedisUtils().hget(ADIO_KEY, info.get("id"))
try:
val_dic = json.loads(val)
except:
ll.status = 2 # 2表示通信中断
location_info.append(ll)
continue
now_date, timestamp = srv_time()
if timestamp - val_dic.get("timestamp") <= LINK_CONT_EXP_TIME and \
val_dic.get("value") is not None: # 5min内
ll.status = val_dic.get("value")
else:
ll.status = 2 # 超过5min,2表示通信中断
location_info.append(ll)
# 4. 返回
return LinLocationResp(authority=authority,
location_info=location_info)
# @summary('开断/闭合操作')
# async def post_switch_operation(req, body: SwitchOpReq) -> SwitchOpResp:
# """联动控制开关, 开断/闭合操作"""
# cid = body.cid
# location_id = body.location_id
# switch = body.switch
# switch_end = int(switch[1])
# # 1. 调用业务后台接口,传数给装置
# req_json = {
# "switch": switch_end, # 0-开断 1-闭合
# "location_id": location_id
# }
# try:
# resp, status = await AioHttpUtils().post(SETTING.linkage_control_url,
# req_json, timeout=50)
# except Exception as e:
# log.exception(e)
# # 1.1 自己超时, 返回50001 (超时,程序会报requests.exceptions.Timeout异常)
# log.error("联动控制超时")
# return SwitchOpResp.timeout_self_err()
# resp = json.loads(resp)
# # 1.2 业务500, 返回50002
# if status != 200:
# log.warning("linkage control status 50002")
# return SwitchOpResp.service_err()
# # 1.3 硬件超时,有timeout,
# # 如果是断开到闭合,有timeout,则认为是跳闸到闭合失败,返回50003
# # 其他,硬件超时, 返回50004
# if resp.get("timeout"):
# if switch == "01":
# log.warning("linkage control status Hardware_timeout 50003")
# return SwitchOpResp.hardware_timeout_opentoclose_err()
# log.warning("linkage control status 50004")
# return SwitchOpResp.hardware_timeout_err()
# # 1.4. 硬件500, status_code500, 返回5005
# if resp.get("status_code") == 500:
# if switch == "01":
# log.warning("linkage control status Hardware_500 50003")
# return SwitchOpResp.hardware_timeout_opentoclose_err()
# log.warning("linkage control status 50005")
# return SwitchOpResp.hardware_control_err()
# # 2. 更新redis
# now_date, timestamp = srv_time()
# redis_dic = {
# "value": switch_end,
# "timestamp": int(timestamp)
# }
# redis_dic = json.dumps(redis_dic)
# # val返回0
# val = await RedisClient().hset(ADIO_KEY, str(location_id), redis_dic)
# # 3. 更新操作到联动控制记录表
# user_id = jwt_user(req)
# if not user_id:
# return LinLocationResp.missing_jwt()
# insert_sql = "INSERT INTO linkage_control_operation_record " \
# "(`time`, location_id, `type`, action, user_id) VALUES %s"
# async with MysqlUtil() as conn:
# # 成功res返回1
# res = await conn.execute(insert_sql, args=(
# (now_date, location_id, 'remote', switch, user_id),))
# if res != 1:
# return SwitchOpResp.db_error()
# return SwitchOpResp.success()
@summary('状态记录')
async def post_switch_record_list(req,
body: SwitchRecordReq) -> SwitchRecordResp:
"""联动控制, 状态记录"""
cid = body.cid
is_wechat = body.is_wechat
location_list = body.location_list
if not location_list:
async with MysqlUtil() as conn:
sql = "select lid from location where cid=%s"
locations = await conn.fetchall(sql, args=(cid,))
location_list = [l["lid"] for l in locations]
if not location_list:
return SwitchRecordResp(total=0, rows=[])
page_num = body.page_num
page_record_num = body.page_record_num
# web日月筛选按钮
start = body.start
end = body.end
# 1.查询联动控制记录表
sql_total = "SELECT count(*) from linkage_control_operation_record " \
"where location_id in %s"
total_args = (tuple(location_list),)
if start and end:
sql_total = "SELECT count(*) from linkage_control_operation_record " \
"where location_id in %s and time >= %s and time <= %s"
total_args = (tuple(location_list), start, end)
base_sql = "select lr.`time`,lr.`type`,lr.`action`,lr.location_id, " \
"location.`ad_type` group,location.item, lr.user_id " \
"from linkage_control_operation_record lr left join location " \
"on lr.location_id = location.id where location_id in %s "
if is_wechat == 1: # 小程序端
sql = base_sql + "order by `time` desc limit 0, 20"
args = (tuple(location_list),)
else: # web端
if not start:
sql = base_sql + "order by `time` desc limit %s, %s"
args = (tuple(location_list), (page_num - 1) * page_record_num,
page_record_num)
else:
sql = base_sql + "and lr.time >= %s and lr.time <= %s " \
"order by `time` desc limit %s, %s"
args = (tuple(location_list), start, end,
(page_num - 1) * page_record_num, page_record_num)
async with MysqlUtil() as conn:
total_res = await conn.fetch_value(sql_total, args=total_args)
db_result = await conn.fetchall(sql, args=args)
if not db_result:
return SwitchRecordResp(total=0, rows=[])
# 2. 构造返回
record_info = []
for info in db_result:
# location名
if info.get("item") == "default":
location = info.get("group")
else:
location = info.get("group") + '_' + info.get("item")
# 操作人
if info.get("user_id"):
op_sql = "select name from linkage_control_authority " \
"where user_id = %s"
async with MysqlUtil() as conn:
op_res = await conn.fetchone(op_sql, args=(
info.get("user_id"),))
op_name = op_res.get("name")
else:
op_name = ""
time_str = info.get("time").strftime('%Y-%m-%d %H:%M:%S')
sr = SwitchRecord()
sr.time = time_str # 时间
sr.location = location # 开关
sr.type = info.get("type") # 类型
sr.action_info = info.get("action") # 动作情况
sr.user = op_name # 操作人
record_info.append(sr)
return SwitchRecordResp(
total=total_res if total_res < 10000 else 10000,
rows=record_info)
@summary('开断/闭合操作-直接与装置通信')
async def post_switch_operation(req, body: SwitchOpReq) -> SoesResp:
"""联动控制开关, 开断/闭合操作"""
cid = body.cid
location_id = body.location_id
switch = body.switch
switch_end = int(switch[1])
user_id = req.ctx.user_id
return await switch_operation_emq_service(location_id, switch, switch_end,
user_id)