Sakura__(0x02) 云端(服务器端)
本部分源码放置于
3.3.1 MQTT Broker (EMQX)
MQTT作为整个项目的主要通讯协议,在项目整体上起着至关重要的作用。而MQTT Broker作为MQTT协议的核心更是重中之重,选择一个合适的Broker需要从多个角度考虑:
- (1) 支持 mqtt3.1 / mqtt3.1.1协议
- (2) 支持QoS0、QoS1(可选QoS2)
- (3) 支持遗嘱消息
- (4) 支持持久化
- (5) 支持多种连接方式
- (6) 支持集群
- (7) 支持多种验证方式
- (8) 最好是开源项目
针对以上需求,选择了几个目前的主流MQTT Broker方案,分析其主要特征,结果整理为下表。
Broker | 开源 | 语言 | 连接方式 | QoS | 持久化 | 集群 |
---|---|---|---|---|---|---|
mosquitto | ✔ | C/C++ | TCP Websocket TCP/SSL Websocket/SSL | QoS0 QoS1 QoS2 | ? | ✘ |
EMQX | ✔ | Erlang | 同上 | 同上 | ✔ | ✔ |
HiveMQ | ✘ | Java | 同上 | 同上 | ✔ | ✔ |
EMQ X 3.0(以下简称EMQX)是一款基于 Erlang/OTP 语言平台开发,同时支持大规模连接和分布式集群,基于发布/订阅模式的国产开源 MQTT 消息服务器。其官方宣称完整支持 MQTT V3.1/V3.1.1/V5.0 版本的标准协议规范,并能够扩展支持 MQTT-SN 、WebSocket、CoAP、Stomp 或私有 TCP/UDP 协议。EMQ X 3.0 消息服务器能够支撑单节点100万级别的连接和构建多节点分布式集群[ ]:
在本项目中选用EMQX作为MQTT Broker,部署方式采用Docker镜像安装,同时开启emqx_auth_redis、emqx_recon、emqx_retainer、emqx_management、emqx_dashboard等插件。
emqx_auth_redis为Redis 认证插件,通过在Redis上存放Username、Password和ACL,实现能快速修改验证鉴权与访问控制。emqx_management为管理API、CLI, emqx_dashboard为Web 管理控制台。
通信接口上分为两部分,对外监听1883 MQTT协议端口用于与硬件设备通讯,对外监听8083 WS(Websocket)端口,并在宿主机上由nginx负责443端口SSL加密反向代理到本机8083端口,间接实现WSS(Websocket/SSL)用于与微信小程序通讯。另外还开放18083端口为Web管理控制台,通过宿主机nginx反向代理从外部访问调试EMQX。
3.3.2数据存储 (Redis、MariaDB)
云端的数据存储分为两部分,关系型数据库MariaDB用于存放用户与设备的绑定映射关系(user2lock表)和设备动作历史记录(lockhistory表),键值(Key-Value)数据库Redis的数据分为两个数据库存放,database 0存放mqtt_user和mqtt_acl用于EMQX的身份认证,database 1存放monitor键和lockno_m&lockno_f,分别用于确定设备上次状态和暂存双方开锁指令。
3.3.3 Python后端程序
后端程序使用Python语言、Flask框架编写,遵从单一服务原则根据功能不同拆分成三个模块,三个模块分别放在容器在独立运行,但都引用同一配置文件和一些基本库,在保证隔离性同时,使底层配置信息统一。
这三个模块分别为Device Part、Monitor Part、WX Part,Device Part负责设备管理与OTA固件升级等,Monitor Part负责设备状态监控记录、开锁鉴权中转、用户端mqtt认证过期销毁等,WX Part负责与微信小程序对接的所有功能,包括新设备用户绑定、用户登录(用户设备关系确认、mqtt认证发放)、设备历史记录查询、设备解除绑定等。
以下将简单的展示说明部分关键作用点:
WX Part负责与微信小程序对接,其主体部分为login函数,如下图所示:
用户端开启微信小程序后会向服务器POST发送JSON格式的临时登录凭证code,后端程序获得临时登录凭证code后,调用微信平台提供的auth.code2Session 接口,换取用户唯一标识OpenID。之后在数据库中查询user2lock表中usreno字段是否存在此OpenID,如果不存在则返回 {'exists': 0} 到微信小程序使其跳转至注册Page。若存在,则取出user2lock表中相对应的信息,随机生成两个字符串经由Base64加密后作为mqtt认证的Username和Password,将这些信息打包JSON化返回给微信小程序。同时,组合成对应的mqtt_user和mqtt_acl写入Redis,EMQX通过读取Redis中的mqtt_user对用户端实现验证,读取mqtt_acl实现对用户端的访问权限控制,仅允许用户端订阅(Subscribe)对应设备的主题(Topics)和发布(Publish)对应的morf数据以及 /unauth。
这样,整个login阶段到此结束,在此之后的用户端通讯交由MQTT完成,WX Part不参与。
部分代码展示
@app.route('/login', methods=['POST'])
def login():
# 临时登录凭证code换取用户唯一标识OpenID
rdata = json.loads(request.data)
openid = getopenid(rdata['code'])
print openid
# 查询user2lock表 是否存在此OpenID
data = db.select("SELECT lockno,morf,adate FROM user2lock WHERE userno='{}'".format(openid))
if len(data) == 0:
# 不存在则返回 0
return jsonify({'exists': 0})
else:
print data[0]
lockno = data[0][0]
morf = data[0][5]
adate = data[0][6]
# 随机生成两个字符串并Base64加密
randomkey = base64.b64encode(os.urandom(16))
randompwd = base64.b64encode(os.urandom(8))
mqtt_user = 'mqtt_user:' + randomkey
mqtt_acl = 'mqtt_acl:' + randomkey
Topic_all = '/' + lockno + '/#'
Topic_morf = '/' + lockno + '/' + morf
# Topic_ping = '/' + lockno + '/ping'
Topic_unauth = '/unauth'
# 生成mqtt_user和mqtt_acl写入Redis
r.hset(mqtt_user, 'password', randompwd)
r.hmset(mqtt_acl, {Topic_all: 1, Topic_morf: 2, Topic_unauth: 2})
# 信息打包JSON化返回
return jsonify(
{'exists': 1, 'lockno': lockno, 'morf': morf, 'adate': adate, 'key': randomkey, 'pwd': randompwd})
Monitor Part 可以说是一个监控程序或命令中转点,其底层原理是作为一个MQTT客户端订阅所有的主题(/#),当出现相应的主题时做出操作,达到监控调节的作用:
Monitor Part启动时订阅 /# 主题,当接收到消息时(on_message)[ ],对Topic进行拆分,对拆分后的第一段字符串进行判断。如果第一段字符串为unauth,取出Payload中的Username数据,删除在Redis中对应的mqtt_user和mqtt_acl,,这里的作用是当用户端微信小程序因主动关闭、超时等各种原因关闭连接时发出Last Will遗嘱消息/unauth,删除其在Redis中存在的MQTT认证凭据,保证系统的安全性。如果第一段字符串为设备的lockno,则判断第二段字符串。statu为设备发送的运行状态,与Redis中保存的对应设备的上次状态信息做比较,若为新状态则将对应的设备动作写入数据库中lockhistory表,同时更新Redis中保存的状态信息。f和m是用户端发来的设备开锁指令,需保证在一段时间内(35s)两方都确认后方可开锁,实现的方式是在Redis中写入lockno_m和lockno_f,并设置键的过期时间为35s,当收到f或m指令时,若Redis中同时存在另一方的键,则发布(Publish)对应设备的unlock解锁指令,设备单片机接收到指令后开锁。
部分代码展示
@mqtt.on_message()
def on_mqtt_message(client, userdata, message):
# 接收到消息 对Topic进行拆分
topic = message.topic
payload = message.payload.decode()
topic_part = topic.split('/')
# if topic_part[0] == '$SYS':
# topic_sys(topic_part)
# if topic_part[0] == '':
# topic_common(topic_part, payload)
topic_common(topic_part, payload)
def topic_common(topic_part, payload):
# 对拆分后的字符串进行判断
if len(topic_part[1]) == 11:
lockno = topic_part[1]
command = topic_part[2]
# if command == 'ping':
# topic_ping(lockno)
if command == 'm':
topic_m(lockno)
if command == 'f':
topic_f(lockno)
if command == 'statu':
topic_statu(lockno, payload)
else:
# 处理unauth
if topic_part[1] == 'unauth':
mqtt_user = 'mqtt_user:' + payload
mqtt_acl = 'mqtt_acl:' + payload
r.delete(mqtt_user)
r.delete(mqtt_acl)
def topic_m(lockno):
if r1.exists(lockno + '_f'):
r1.delete(lockno + '_f')
pubtopic = '/' + lockno + '/call'
mqtt.publish(pubtopic, 'unlock')
else:
r1.set(lockno + '_m', 1, ex=35)
def topic_statu(lockno, nowstatu):
# 记录设备状态历史
laststatu = r1.hget('monitor', lockno)
if nowstatu != laststatu:
r1.hset('monitor', lockno, nowstatu)
if laststatu == '-2':
pushhistory(lockno, '设备上线')
return None
if nowstatu == '-2':
pushhistory(lockno, '设备离线')
return None
if nowstatu == '1':
pushhistory(lockno, '上锁')
return None
if nowstatu == '0':
pushhistory(lockno, '开锁')
return None
3.3.4容器自动化部署
以上几大组件全部放入docker容器中运行,为实现容器化部署,做出两点改造:
- 1.在docker上创建自定义bridge网络[ ],组件全部处于此网络中,保证业务隔离性和实现容器名互访。
- 2.编写Dockerfile[11]和启动脚本,使用gunicorn作为WSGI服务器运行Python后端程序,并调用gevent实现异步请求和协程管理。同时创建映射卷的中间容器,所有程序源码放在宿主机映射目录里。这里主要实现两个目的:
- ①使用同一镜像创建容器,并输入不同的参数给启动脚本,即可运行三个模块其中之一。
- ②将程序源码修改时,只要替换宿主机映射目录里的源码,gunicorn就会检测到源码修改自动重启服务,能做到持续部署。
Dockerfile
FROM python:2.7
RUN pip install --upgrade pip
RUN pip install flask gunicorn gevent flask-mqtt
RUN pip install redis pymysql
RUN pip install qrcode pycrypto shortuuid requests
ADD start.sh start.sh
RUN chmod +x start.sh
ENTRYPOINT ["./start.sh"]
启动脚本 start.sh
#! /bin/bash
case $1 in
monitor) gunicorn --chdir /project --workers 1 --worker-class gevent --reload monitor_part:app;;
wx) gunicorn --chdir /project --workers 4 --bind 0.0.0.0:5005 --worker-class gevent --reload wx_part:app;;
device) gunicorn --chdir /project --workers 2 --bind 0.0.0.0:5015 --worker-class gevent --reload device_part:app;;
*) echo 'unknow args';;
Esac
部署脚本 setup.sh
#! /bin/bash
echo "------1.create net------"
docker network create sakura_net
echo "------2.initial db------"
# docker run -d --net sakura_net --name mydb -v /sakura_all/db:/db -p 3307:3306 -e MYSQL_ROOT_PASSWORD="******" mariadb:latest
docker run -d --net sakura_net --name mydb -v /sakura_all/db:/db \
-e MYSQL_RANDOM_ROOT_PASSWORD="yes" \
-e MYSQL_DATABASE="lockdata" -e MYSQL_USER="lockadmin" -e MYSQL_PASSWORD="******" \
mariadb:latest
docker exec -it mydb bash -c "mysql -ulockadmin -p****** lockdata < /db/data.sql"
echo "------3.initial redis------"
# docker run -d --net sakura_net --name myredis -p 6380:6379 redis:latest --requirepass "******"
docker run -d --net sakura_net --name myredis redis:latest
echo "------4.initial emqx------"
docker run -d --net sakura_net --name myemqx -p 18083:18083 -p 1883:1883 -p 8083:8083 \
-e EMQX_ADMIN_PASSWORD="******" \
-e EMQX_ALLOW_ANONYMOUS="false" \
-e EMQX_LOADED_PLUGINS="emqx_auth_redis|emqx_recon|emqx_retainer|emqx_management|emqx_dashboard" \
-e EMQX_AUTH__REDIS__SERVER="myredis:6379" \
-e EMQX_AUTH__REDIS__PASSWORD_HASH=plain \
emqx/emqx:latest
# -e EMQX_AUTH__REDIS__PASSWORD="******" \
echo "------5.build project------"
docker build -t sakura_general ./build
echo "------6.create volumes-link------"
docker run --name volumes-link -v /sakura_all/project:/project centos
echo "------7.run project------"
docker run -d --net sakura_net --name monitor_part --volumes-from volumes-link sakura_general monitor
docker run -d --net sakura_net --name wx_part --volumes-from volumes-link -p 5005:5005 sakura_general wx
docker run -d --net sakura_net --name device_part --volumes-from volumes-link -p 5015:5015 sakura_general device
3.3.5外围相关
除放在docker容器中运行的组件之外,宿主机为CentOS7操作系统,在宿主机上运行nginx加载SSL证书[ ]暴露在外网上,并反向代理到后端组件实现加密通讯。
SSL证书使用Let’s Encrypt数字证书认证机构提供的免费SSL证书,并使用acme.sh 工具实现自动化的证书获取与续期。证书使用RSA/ECC双证书,兼具安全性和兼容性。
nginx部分配置文件
server
{
# HTTP跳转HTTPS
listen 80;
server_name ************;
add_header Strict-Transport-Security "max-age=31536000";
return 301 https://$server_name$request_uri;
}
server
{
listen 443 ssl http2;
server_name **************;
# SSL双证书
ssl on;
ssl_certificate ../ssl/rsa/cert.pem;
ssl_certificate_key ../ssl/rsa/key.pem;
ssl_certificate ../ssl/ecc/cert.pem;
ssl_certificate_key ../ssl/ecc/key.pem;
# SSL版本
ssl_session_timeout 5m;
#ssl_protocols TLSv1.1 TLSv1.2 TLSv1.3;
ssl_protocols TLSv1.1 TLSv1.2;
ssl_prefer_server_ciphers on;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
location /wx {
rewrite ^/wx/(.*)$ /$1 break;
proxy_pass http://127.0.0.1:5005;
}
location /lock {
rewrite ^/lock/(.*)$ /$1 break;
proxy_pass http://127.0.0.1:5015;
}
# MQTT over WSS 反向代理
location = /mqtt {
proxy_pass http://127.0.0.1:8083;
proxy_redirect off;
proxy_set_header Host 127.0.0.1:8083;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
location /emqx {
proxy_pass http://127.0.0.1:18083;
}
access_log /home/wwwlogs/access.log;
}