• 本部分源码放置于

3.3.1 MQTT Broker (EMQX)

MQTT作为整个项目的主要通讯协议,在项目整体上起着至关重要的作用。而MQTT Broker作为MQTT协议的核心更是重中之重,选择一个合适的Broker需要从多个角度考虑:

  • (1) 支持 mqtt3.1 / mqtt3.1.1协议
  • (2) 支持QoS0、QoS1(可选QoS2)
  • (3) 支持遗嘱消息
  • (4) 支持持久化
  • (5) 支持多种连接方式
  • (6) 支持集群
  • (7) 支持多种验证方式
  • (8) 最好是开源项目

针对以上需求,选择了几个目前的主流MQTT Broker方案,分析其主要特征,结果整理为下表。

表3-2 主流MQTT Broker比较

Broker 开源 语言 连接方式 QoS 持久化 集群
mosquitto C/C++ TCP Websocket TCP/SSL Websocket/SSL QoS0 QoS1 QoS2
EMQX Erlang 同上 同上
HiveMQ Java 同上 同上

EMQ X 3.0(以下简称EMQX)是一款基于 Erlang/OTP 语言平台开发,同时支持大规模连接和分布式集群,基于发布/订阅模式的国产开源 MQTT 消息服务器。其官方宣称完整支持 MQTT V3.1/V3.1.1/V5.0 版本的标准协议规范,并能够扩展支持 MQTT-SN 、WebSocket、CoAP、Stomp 或私有 TCP/UDP 协议。EMQ X 3.0 消息服务器能够支撑单节点100万级别的连接和构建多节点分布式集群[ ]:

emqx

图18 EMQX

在本项目中选用EMQX作为MQTT Broker,部署方式采用Docker镜像安装,同时开启emqx_auth_redis、emqx_recon、emqx_retainer、emqx_management、emqx_dashboard等插件。
emqx_auth_redis为Redis 认证插件,通过在Redis上存放Username、Password和ACL,实现能快速修改验证鉴权与访问控制。emqx_management为管理API、CLI, emqx_dashboard为Web 管理控制台。
通信接口上分为两部分,对外监听1883 MQTT协议端口用于与硬件设备通讯,对外监听8083 WS(Websocket)端口,并在宿主机上由nginx负责443端口SSL加密反向代理到本机8083端口,间接实现WSS(Websocket/SSL)用于与微信小程序通讯。另外还开放18083端口为Web管理控制台,通过宿主机nginx反向代理从外部访问调试EMQX。

webpage

图19 部署完的EMQX Web管理控制台

3.3.2数据存储 (Redis、MariaDB)

云端的数据存储分为两部分,关系型数据库MariaDB用于存放用户与设备的绑定映射关系(user2lock表)和设备动作历史记录(lockhistory表),键值(Key-Value)数据库Redis的数据分为两个数据库存放,database 0存放mqtt_user和mqtt_acl用于EMQX的身份认证,database 1存放monitor键和lockno_m&lockno_f,分别用于确定设备上次状态和暂存双方开锁指令。

storage

图20 存储模型

3.3.3 Python后端程序

后端程序使用Python语言、Flask框架编写,遵从单一服务原则根据功能不同拆分成三个模块,三个模块分别放在容器在独立运行,但都引用同一配置文件和一些基本库,在保证隔离性同时,使底层配置信息统一。
这三个模块分别为Device Part、Monitor Part、WX Part,Device Part负责设备管理与OTA固件升级等,Monitor Part负责设备状态监控记录、开锁鉴权中转、用户端mqtt认证过期销毁等,WX Part负责与微信小程序对接的所有功能,包括新设备用户绑定、用户登录(用户设备关系确认、mqtt认证发放)、设备历史记录查询、设备解除绑定等。
以下将简单的展示说明部分关键作用点:
WX Part负责与微信小程序对接,其主体部分为login函数,如下图所示:

login

图21 WX Part login逻辑

用户端开启微信小程序后会向服务器POST发送JSON格式的临时登录凭证code,后端程序获得临时登录凭证code后,调用微信平台提供的auth.code2Session 接口,换取用户唯一标识OpenID。之后在数据库中查询user2lock表中usreno字段是否存在此OpenID,如果不存在则返回 {'exists': 0} 到微信小程序使其跳转至注册Page。若存在,则取出user2lock表中相对应的信息,随机生成两个字符串经由Base64加密后作为mqtt认证的Username和Password,将这些信息打包JSON化返回给微信小程序。同时,组合成对应的mqtt_user和mqtt_acl写入Redis,EMQX通过读取Redis中的mqtt_user对用户端实现验证,读取mqtt_acl实现对用户端的访问权限控制,仅允许用户端订阅(Subscribe)对应设备的主题(Topics)和发布(Publish)对应的morf数据以及 /unauth。
这样,整个login阶段到此结束,在此之后的用户端通讯交由MQTT完成,WX Part不参与。

部分代码展示

@app.route('/login', methods=['POST'])
def login():
    # 临时登录凭证code换取用户唯一标识OpenID
    rdata = json.loads(request.data)
    openid = getopenid(rdata['code'])
    print openid

    # 查询user2lock表 是否存在此OpenID
    data = db.select("SELECT lockno,morf,adate FROM user2lock WHERE userno='{}'".format(openid))
    if len(data) == 0:
        # 不存在则返回 0
        return jsonify({'exists': 0})
    else:
        print data[0]
        lockno = data[0][0]
        morf = data[0][5]
        adate = data[0][6]
        # 随机生成两个字符串并Base64加密
        randomkey = base64.b64encode(os.urandom(16))
        randompwd = base64.b64encode(os.urandom(8))
        mqtt_user = 'mqtt_user:' + randomkey
        mqtt_acl = 'mqtt_acl:' + randomkey
        Topic_all = '/' + lockno + '/#'
        Topic_morf = '/' + lockno + '/' + morf
        # Topic_ping = '/' + lockno + '/ping'
        Topic_unauth = '/unauth'

        # 生成mqtt_user和mqtt_acl写入Redis
        r.hset(mqtt_user, 'password', randompwd)
        r.hmset(mqtt_acl, {Topic_all: 1, Topic_morf: 2, Topic_unauth: 2})

        # 信息打包JSON化返回
        return jsonify(
            {'exists': 1, 'lockno': lockno, 'morf': morf, 'adate': adate, 'key': randomkey, 'pwd': randompwd})

Monitor Part 可以说是一个监控程序或命令中转点,其底层原理是作为一个MQTT客户端订阅所有的主题(/#),当出现相应的主题时做出操作,达到监控调节的作用:

monitor

图22 Monitor Part 主要逻辑

Monitor Part启动时订阅 /# 主题,当接收到消息时(on_message)[ ],对Topic进行拆分,对拆分后的第一段字符串进行判断。如果第一段字符串为unauth,取出Payload中的Username数据,删除在Redis中对应的mqtt_user和mqtt_acl,,这里的作用是当用户端微信小程序因主动关闭、超时等各种原因关闭连接时发出Last Will遗嘱消息/unauth,删除其在Redis中存在的MQTT认证凭据,保证系统的安全性。如果第一段字符串为设备的lockno,则判断第二段字符串。statu为设备发送的运行状态,与Redis中保存的对应设备的上次状态信息做比较,若为新状态则将对应的设备动作写入数据库中lockhistory表,同时更新Redis中保存的状态信息。f和m是用户端发来的设备开锁指令,需保证在一段时间内(35s)两方都确认后方可开锁,实现的方式是在Redis中写入lockno_m和lockno_f,并设置键的过期时间为35s,当收到f或m指令时,若Redis中同时存在另一方的键,则发布(Publish)对应设备的unlock解锁指令,设备单片机接收到指令后开锁。

部分代码展示

@mqtt.on_message()
def on_mqtt_message(client, userdata, message):
    # 接收到消息 对Topic进行拆分
    topic = message.topic
    payload = message.payload.decode()

    topic_part = topic.split('/')
    # if topic_part[0] == '$SYS':
    #     topic_sys(topic_part)
    # if topic_part[0] == '':
    #     topic_common(topic_part, payload)
    topic_common(topic_part, payload)

def topic_common(topic_part, payload):
    # 对拆分后的字符串进行判断
    if len(topic_part[1]) == 11:
        lockno = topic_part[1]
        command = topic_part[2]
        # if command == 'ping':
        #     topic_ping(lockno)
        if command == 'm':
            topic_m(lockno)
        if command == 'f':
            topic_f(lockno)
        if command == 'statu':
            topic_statu(lockno, payload)
    else:
        # 处理unauth
        if topic_part[1] == 'unauth':
            mqtt_user = 'mqtt_user:' + payload
            mqtt_acl = 'mqtt_acl:' + payload
            r.delete(mqtt_user)
            r.delete(mqtt_acl)

def topic_m(lockno):
    if r1.exists(lockno + '_f'):
        r1.delete(lockno + '_f')
        pubtopic = '/' + lockno + '/call'
        mqtt.publish(pubtopic, 'unlock')
    else:
        r1.set(lockno + '_m', 1, ex=35)

def topic_statu(lockno, nowstatu):
    # 记录设备状态历史
    laststatu = r1.hget('monitor', lockno)
    if nowstatu != laststatu:
        r1.hset('monitor', lockno, nowstatu)
        if laststatu == '-2':
            pushhistory(lockno, '设备上线')
            return None
        if nowstatu == '-2':
            pushhistory(lockno, '设备离线')
            return None
        if nowstatu == '1':
            pushhistory(lockno, '上锁')
            return None
        if nowstatu == '0':
            pushhistory(lockno, '开锁')
            return None

3.3.4容器自动化部署

以上几大组件全部放入docker容器中运行,为实现容器化部署,做出两点改造:

  • 1.在docker上创建自定义bridge网络[ ],组件全部处于此网络中,保证业务隔离性和实现容器名互访。
  • 2.编写Dockerfile[11]和启动脚本,使用gunicorn作为WSGI服务器运行Python后端程序,并调用gevent实现异步请求和协程管理。同时创建映射卷的中间容器,所有程序源码放在宿主机映射目录里。这里主要实现两个目的:
  • ①使用同一镜像创建容器,并输入不同的参数给启动脚本,即可运行三个模块其中之一。
  • ②将程序源码修改时,只要替换宿主机映射目录里的源码,gunicorn就会检测到源码修改自动重启服务,能做到持续部署。

Dockerfile

FROM  python:2.7

RUN pip install --upgrade pip
RUN pip install flask gunicorn gevent flask-mqtt
RUN pip install redis pymysql
RUN pip install qrcode pycrypto shortuuid requests

ADD start.sh start.sh
RUN chmod +x start.sh

ENTRYPOINT ["./start.sh"]

启动脚本 start.sh

#! /bin/bash

case $1 in
    monitor) gunicorn --chdir /project --workers 1 --worker-class gevent --reload  monitor_part:app;;
    wx) gunicorn --chdir /project --workers 4 --bind 0.0.0.0:5005 --worker-class gevent --reload  wx_part:app;;
    device) gunicorn --chdir /project --workers 2 --bind 0.0.0.0:5015 --worker-class gevent --reload  device_part:app;;
    *) echo 'unknow args';;
Esac

部署脚本 setup.sh

#! /bin/bash

echo "------1.create net------"
docker network create sakura_net

echo "------2.initial db------"
# docker run -d --net sakura_net --name mydb -v /sakura_all/db:/db -p 3307:3306 -e MYSQL_ROOT_PASSWORD="******" mariadb:latest
docker run -d --net sakura_net --name mydb -v /sakura_all/db:/db \
    -e MYSQL_RANDOM_ROOT_PASSWORD="yes" \
    -e MYSQL_DATABASE="lockdata" -e MYSQL_USER="lockadmin" -e MYSQL_PASSWORD="******" \
    mariadb:latest
docker exec -it mydb bash -c "mysql -ulockadmin -p****** lockdata < /db/data.sql"

echo "------3.initial redis------"
# docker run -d --net sakura_net --name myredis -p 6380:6379  redis:latest --requirepass "******"
docker run -d --net sakura_net --name myredis redis:latest

echo "------4.initial emqx------"
docker run -d --net sakura_net --name myemqx -p 18083:18083 -p 1883:1883 -p 8083:8083 \
    -e EMQX_ADMIN_PASSWORD="******" \
    -e EMQX_ALLOW_ANONYMOUS="false" \
    -e EMQX_LOADED_PLUGINS="emqx_auth_redis|emqx_recon|emqx_retainer|emqx_management|emqx_dashboard" \
    -e EMQX_AUTH__REDIS__SERVER="myredis:6379" \
    -e EMQX_AUTH__REDIS__PASSWORD_HASH=plain \
    emqx/emqx:latest
    # -e EMQX_AUTH__REDIS__PASSWORD="******" \

echo "------5.build project------"
docker build -t sakura_general ./build

echo "------6.create volumes-link------"
docker run --name volumes-link -v /sakura_all/project:/project centos

echo "------7.run project------"
docker run -d --net sakura_net --name monitor_part --volumes-from volumes-link sakura_general  monitor
docker run -d --net sakura_net --name wx_part --volumes-from volumes-link -p 5005:5005 sakura_general  wx
docker run -d --net sakura_net --name device_part --volumes-from volumes-link -p 5015:5015 sakura_general  device

3.3.5外围相关

除放在docker容器中运行的组件之外,宿主机为CentOS7操作系统,在宿主机上运行nginx加载SSL证书[ ]暴露在外网上,并反向代理到后端组件实现加密通讯。

SSL证书使用Let’s Encrypt数字证书认证机构提供的免费SSL证书,并使用acme.sh 工具实现自动化的证书获取与续期。证书使用RSA/ECC双证书,兼具安全性和兼容性。

nginx部分配置文件

server
{
    # HTTP跳转HTTPS
    listen 80;
    server_name ************;

    add_header Strict-Transport-Security "max-age=31536000";

    return 301 https://$server_name$request_uri;
}

server
{
    listen 443 ssl http2;
    server_name **************;

    # SSL双证书
    ssl on;
    ssl_certificate ../ssl/rsa/cert.pem;
    ssl_certificate_key ../ssl/rsa/key.pem;
    ssl_certificate ../ssl/ecc/cert.pem;
    ssl_certificate_key ../ssl/ecc/key.pem;

    # SSL版本
    ssl_session_timeout 5m;
    #ssl_protocols TLSv1.1 TLSv1.2 TLSv1.3;
    ssl_protocols TLSv1.1 TLSv1.2;
    ssl_prefer_server_ciphers on;

    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;

location /wx {
        rewrite ^/wx/(.*)$ /$1 break;
        proxy_pass http://127.0.0.1:5005;
    }

location /lock {
rewrite ^/lock/(.*)$ /$1 break;
        proxy_pass http://127.0.0.1:5015;
    }

    # MQTT over WSS 反向代理
    location = /mqtt {
        proxy_pass http://127.0.0.1:8083;
        proxy_redirect off;
        proxy_set_header Host 127.0.0.1:8083;

        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }

    location /emqx {
        proxy_pass http://127.0.0.1:18083;
    }

    access_log  /home/wwwlogs/access.log;
}