Nginx + Docker 手动集群方式运行 EMQ
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,EMQ X 在支持客户的过程中,了解到有客户使用 Nginx 做负载均衡,Docker 容器手动加入集群的方式运行 EMQ 集群,现将主要过程记录下来。业务需求使用 Nginx 作为反向代理Nginx
千家信息网最后更新 2025年02月04日Nginx + Docker 手动集群方式运行 EMQ
EMQ X 在支持客户的过程中,了解到有客户使用 Nginx 做负载均衡,Docker 容器手动加入集群的方式运行 EMQ 集群,现将主要过程记录下来。
业务需求
- 使用 Nginx 作为反向代理
- Nginx 需要提前分配好代理 server 的地址
- 使用 Docker 容器运行 EMQ
- EMQ 自动重启
- EMQ 重启后自动集群
配置
Nginx 配置
$ cat /etc/nginx/tcpstream.conf## tcp LB and SSL passthrough for backend ##stream { upstream mqtt_broker { server 127.0.0.1:21871; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21872; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21873; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21874; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21875; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21881; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21891; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21882; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21892; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21883; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21893; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21884; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21894; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21885; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21895; #max_fails=5 fail_timeout=30s; }log_format basic '$proxy_protocol_addr - $remote_addr [$time_local] ' '$protocol $status $bytes_sent $bytes_received ' '$session_time "$upstream_addr" ' '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"'; access_log /var/log/nginx/access.log basic; error_log /var/log/nginx/error.log; server { listen 8884 ssl; # proxy_protocol; proxy_next_upstream on; #proxy_bind $remote_addr transparent; proxy_ssl off; proxy_pass mqtt_broker; proxy_protocol on; #ssl_on; # adding some extra proxy settings proxy_timeout 350s; #proxy_buffer_size 128k; #ssl_certificate /etc/nginx/certs/solace.pem; #ssl_certificate_key /etc/nginx/certs/solace.pem; ssl_certificate /etc/nginx/certs/cert.pem; ssl_certificate_key /etc/nginx/certs/key.pem; #ssl_verify_client off; ssl_protocols TLSv1 TLSv1.1 TLSv1.2; ssl_ciphers HIGH:!aNULL:!MD5; }}
Docker 配置
客户自行编译的 Docker image,并非使用 EMQ 提供的官方镜像。
Dockerfile 目录如下:
$ ll /opt/Docker/总用量 28-rw-r--r-- 1 alexeyp emq 620 10月 22 17:26 Dockerfilelrwxrwxrwx 1 alexeyp emq 13 10月 24 13:59 emqttd -> emqttd.2.3.11drwxr-xr-x 10 alexeyp emq 110 10月 24 14:27 emqttd.2.3.11-rwxr-xr-x 1 alexeyp emq 3463 10月 26 05:03 StartEmqInstance.sh-rwxr-xr-x 1 alexeyp alexeyp 270 10月 25 10:46 status.sh
Dockerfile:
$ cat DockerfileFROM centos:latestRUN yum -y updateEXPOSE 60000-65000WORKDIR /opt/emqttdADD ./emqttd /opt/emqttdADD ./vsparc.rpm /tmp/vsparc.rpmADD ./StartEmqInstance.sh /opt/emqttd/StartEmqInstance.shRUN yum install -y epel-releaseRUN yum install -y which less sed net-tools telnet gtest /tmp/vsparc.rpmENV TZ Australia/MelbourneCMD bash /opt/emqttd/StartEmqInstance.sh && bash
可以看到 Docker 容器启动后会执行一个 StartEmqInstance.sh 的脚本,查看该脚本:
$ cat StartEmqInstance.sh#!/bin/bashDIR=$(dirname $0)HOSTNAME=$(hostname -s)function adjust_instance(){ local INST=$1 local INST_ROOT=$2 cat $INST_ROOT/etc/emq.conf | \ sed -re "s/^node\.name\s*=.*$/node.name = emq$INST@127.0.0.1/" | \ #sed -re "s/^cluster\.name\s*=.*$/cluster.name = $HOSTNAME/" | \ sed -re "s/^listener\.tcp\.external\s*=.*$/listener.tcp.external = 0.0.0.0:6188$INST/" | \ sed -re "s/^listener\.tcp\.external1\s*=.*$/listener.tcp.external1 = 0.0.0.0:6189$INST/" | \ sed -re "s/^listener\.tcp\.external2\s*=.*$/listener.tcp.external2 = 0.0.0.0:6187$INST/" | \ sed -re "s/^listener\.tcp\.internal\s*=.*$/listener.tcp.internal = 127.0.0.1:6298$INST/" | \ sed -re "s/^listener\.ssl\.external\s*=.*$/listener.ssl.external = 6288$INST/" | \ sed -re "s/^listener\.ws\.external\s*=.*$/listener.ws.external = 6208$INST/" | \ sed -re "s/^listener\.wss\.external\s*=.*$/listener.ws.external = 6308$INST/" | \ sed -re "s/^listener\.api\.mgmt\s*=.*$/listener.api.mgmt = 6408$INST/" | \ sed -re "s/^(##\s)?listener\.tcp\.external\.proxy_protocol\s=.*$/listener.tcp.external.proxy_protocol = on/" | \ sed -re "s/^(##\s)?listener\.tcp\.external1\.proxy_protocol\s=.*$/listener.tcp.external1.proxy_protocol = on/" | \ sed -re "s/^(##\s)?listener\.tcp\.external2\.proxy_protocol\s=.*$/listener.tcp.external2.proxy_protocol = on/" | \ sed -re "s/^(##\s)?listener\.tcp\.external\.proxy_protocol_timeout\s=.*$/listener.tcp.external.proxy_protocol_timeout = 30s/" | \ sed -re "s/^(##\s)?listener\.tcp\.external1\.proxy_protocol_timeout\s=.*$/listener.tcp.external1.proxy_protocol_timeout = 30s/" | \ sed -re "s/^(##\s)?listener\.tcp\.external2\.proxy_protocol_timeout\s=.*$/listener.tcp.external2.proxy_protocol_timeout = 30s/" | \ sed -re "s/^(##\s)?node.dist_listen_min\s*=.*$/node.dist_listen_min = 6000$INST/" | \ sed -re "s/^(##\s)?node.dist_listen_max\s*=.*$/node.dist_listen_max = 6000$INST/" | \ cat - > $INST_ROOT/etc/emq.conf.new mv $INST_ROOT/etc/emq.conf.new $INST_ROOT/etc/emq.conf}function cluster_instance(){ local INST=$1 for DEST in 1 2 3 4 5; do if [ $DEST == $INST ]; then continue; fi DEST_NODE="emq$DEST@127.0.0.1" RESULT=$(/opt/emqttd/bin/emqttd_ctl cluster join $DEST_NODE 2>&1) echo "$RESULT" echo "$RESULT" | grep -E 'successfully|already' > /dev/null RC=$? [ $RC == 0 ] && break done}cd "$DIR"if [ "$EMQ_INSTANCE_NUMBER" == "" ]; then echo "Environment variable EMQ_INSTANCE_NUMBER(1..10) is not set." echo "eMQ instance name is not configured." exit 1else adjust_instance $EMQ_INSTANCE_NUMBER $DIRfifunction run_application(){ local CMD="$1" local RC=1 while [ $RC != 0 ]; do $CMD RC=$? echo "### Exited: $CMD" echo "### rc = $RC" #[ $RC != 0 ] && sleep 3 RC=1 done echo "### Done: $CMD"}function start_node(){ bin/emqttd start STARTED=0 while [ $STARTED == 0 ]; do sleep 1 /opt/emqttd/bin/emqttd_ctl status | grep "is running" [ $? == 0 ] && break done cluster_instance $EMQ_INSTANCE_NUMBER > /tmp/cluster_instance.log}start_nodesleep 5run_application "/usr/local/bin/emqtt-stats-collector" waitIDLE_TIME=0while [[ $IDLE_TIME -lt 5 ]]do IDLE_TIME=$((IDLE_TIME+1)) if [[ ! -z "$( /opt/emqttd/bin/emqttd_ctl status|grep 'is running'|awk '{print $1}')" ]]; then IDLE_TIME=0 else echo "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd not running, waiting for recovery in $((60-IDLE_TIME*5)) seconds" fi sleep 5doneecho "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd exit abnormally"exit 1
脚本内容稍多而且有些复杂,需要结合 start.sh
脚本和 etc/emq.conf
一起看
$ cat start.sh#!/bin/bashfor INST in 1 2 3 4 5do docker ps | grep -E "\sinstance_$INST$" if [ $? != 0 ]; then #docker run -itd ---ulimit nofile=1048576 -restart=always -v /opt/Docker/emqtt/emq$INST/data/mnesia:/opt/emqttd/data/mnesia -e EMQ_INSTANCE_NUMBER=$INST --name=instance_$INST --network host emq:test & docker run -itd --ulimit nofile=1048576 -e EMQ_INSTANCE_NUMBER=$INST --name=instance_$INST --network host emq:latest & fidonewait
EMQ 配置
etc/emq.conf`的全文就不贴出来了,主要是增加了两个 tcp 监听端口,并且关闭了`listener.tcp.external.tune_buffer$ cat etc/emq.conf......##--------------------------------------------------------------------listener.tcp.external = 0.0.0.0:21881listener.tcp.external.acceptors = 16listener.tcp.external.max_clients = 512000listener.tcp.external.access.1 = allow alllistener.tcp.external.proxy_protocol = onlistener.tcp.external.proxy_protocol_timeout = 30slistener.tcp.external.backlog = 1024listener.tcp.external.send_timeout = 15slistener.tcp.external.send_timeout_close = on## listener.tcp.external.tune_buffer = onlistener.tcp.external.nodelay = truelistener.tcp.external.reuseaddr = true##--------------------------------------------------------------------listener.tcp.external1 = 0.0.0.0:21891listener.tcp.external1.acceptors = 16listener.tcp.external1.max_clients = 512000listener.tcp.external1.access.1 = allow alllistener.tcp.external1.proxy_protocol = onlistener.tcp.external1.proxy_protocol_timeout = 30slistener.tcp.external1.backlog = 1024listener.tcp.external1.send_timeout = 15slistener.tcp.external1.send_timeout_close = on## listener.tcp.external1.tune_buffer = onlistener.tcp.external1.nodelay = truelistener.tcp.external1.reuseaddr = true##--------------------------------------------------------------------listener.tcp.external2 = 0.0.0.0:21871listener.tcp.external2.acceptors = 16listener.tcp.external2.max_clients = 512000listener.tcp.external2.access.1 = allow alllistener.tcp.external2.proxy_protocol = onlistener.tcp.external2.proxy_protocol_timeout = 30slistener.tcp.external2.backlog = 1024listener.tcp.external2.send_timeout = 15slistener.tcp.external2.send_timeout_close = on## listener.tcp.external2.tune_buffer = onlistener.tcp.external2.nodelay = truelistener.tcp.external2.reuseaddr = true......
业务分析
Docker 容器初始化
Docker 容器创建之后, StartEmqInstance.sh
执行 adjust_instance()
将 etc/emq.conf
中监听的端口修改为Nginx 的代理 server
sed -re "s/^node\.name\s*=.*$/node.name = emq$INST@127.0.0.1/" | \ sed -re "s/^listener\.tcp\.external\s*=.*$/listener.tcp.external = 0.0.0.0:6188$INST/" sed -re "s/^listener\.tcp\.external1\s*=.*$/listener.tcp.external1 = 0.0.0.0:6189$INST/" sed -re "s/^listener\.tcp\.external2\s*=.*$/listener.tcp.external2 = 0.0.0.0:6187$INST/" sed -re "s/^listener\.tcp\.internal\s*=.*$/listener.tcp.internal = 127.0.0.1:6298$INST/"
并通过 join
命令来实现集群功能
function cluster_instance(){ local INST=$1 for DEST in 1 2 3 4 5; do if [ $DEST == $INST ]; then continue; fi DEST_NODE="emq$DEST@127.0.0.1" RESULT=$(/opt/emqttd/bin/emqttd_ctl cluster join $DEST_NODE 2>&1) echo "$RESULT" echo "$RESULT" | grep -E 'successfully|already' > /dev/null RC=$? [ $RC == 0 ] && break done}
循环检查 EMQ 的状态,当 EMQ 停止了之后退出容器
IDLE_TIME=0while [[ $IDLE_TIME -lt 5 ]]do IDLE_TIME=$((IDLE_TIME+1)) if [[ ! -z "$( /opt/emqttd/bin/emqttd_ctl status|grep 'is running'|awk '{print $1}')" ]]; then IDLE_TIME=0 else echo "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd not running, waiting for recovery in $((60-IDLE_TIME*5)) seconds" fi sleep 5doneecho "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd exit abnormally"exit 1
访问
客户端通过 SSL 方式连接 地址,Nginx 将连接以 TCP 方式负载到 EMQ 节点。
PS:关于 Nginx 如何反向代理 tcp 和 ssl 的设置,可以参考 EMQ X 消息服务器 Nginx 反向代理
自动重启和自动集群
容器启动后通过 StartEmqInstance.sh
脚本查询 EMQ 的状态,当 EMQ 停止时退出容器,配合 --restart=always
来达到重启容器的目的。
EMQ 将集群信息储存在 data/mnesia
中,将容器的中的目录映射到宿主机,当容器重启之后会读取宿主机映射的相关目录,实现重启后自动集群。
存在问题
- Docker 的 host 网络模式使用宿主机的网络,当宿主机有其他业务在执行的时候,容易出现端口冲突
解决方案
- 修改
/proc/sys/net/ipv4/ip_local_port_range
指定系统分配的端口为1024 60000
,然后将 EMQ 的业务端口分配为 60000 之后的端口
实践案例
建议使用 kubernetes 来编排 docker 容器:
- EMQ 可以通过
kube-apiserver
来实现自动集群的功能。 - 该客户目前只是在单机部署docker集群,使用 kubernetes 可以轻易实现多个节点之间部署集群。
- kubernetes 的
deployment
可以监控emqx pod
的状态,实现自动重启、弹性扩容等功能。 - 每个
emqx pod
都有独立的虚拟 IP,不会出现端口冲突的问题。 - kubernetes 的
Service
可以实现固定 IP 和负载均衡的需求,在Service
创建的请求中,可以通过设置spec.clusterIP
字段来指定自己的集群 IP 地址,将 Nginx 的代理 server 设置成clusterIP
即可,Service
可自行实现负载均衡。
容器
集群
端口
代理
客户
脚本
业务
宿主
宿主机
配置
方式
均衡
功能
地址
状态
目录
分配
运行
可以通过
网络
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
版纳互联网科技怎么样
软件开发项目怎么投资
高斯数据库基于mysql
四川人工智能软件开发价位
网络安全私密性匿名度
网络安全和校园安全体会
服务器配置参数怎么看
老式ibm服务器改网络存储
广州视频安防软件开发费用
服务器怎么清理注册表垃圾
udp服务器下载
戴尔服务器硬盘保修期
网络安全监测屏作用
京东互联网科技有什么
网络安全重点布局项目武汉
ssl服务器安全连接不上
服务器内存怎么放大
数据库作业答案第五版
萝岗正规的网络数据库维护
软件开发就业方向包括哪些
壁画效果图软件开发
数据库如何入门与精通
怎样运用网络技术传播平台
服务器安全接入网关
庐江软件开发岗位
服务器license管理
数据库英文管理系统英文
中职生网络技术基础
志强服务器打游戏
链表添加初始数据库