Python中如何使用matplotlib绘制mqtt数据实时图像功能
发表于:2025-01-16 作者:千家信息网编辑
千家信息网最后更新 2025年01月16日,这篇文章主要讲解了"Python中如何使用matplotlib绘制mqtt数据实时图像功能",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Python中
千家信息网最后更新 2025年01月16日Python中如何使用matplotlib绘制mqtt数据实时图像功能matplotlib绘制mqtt数据实时图像
这篇文章主要讲解了"Python中如何使用matplotlib绘制mqtt数据实时图像功能",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Python中如何使用matplotlib绘制mqtt数据实时图像功能"吧!
目录
效果图
mqtt发布
mqtt订阅
matplotlib绘制动态图
matplotlib绘制mqtt数据实时图像
效果图
mqtt发布
本代码中publish
是一个死循环,数据一直往外发送。
import randomimport timefrom paho.mqtt import client as mqtt_clientimport jsonfrom datetime import datetimebroker = 'broker.emqx.io'port = 1883topic = "/python/mqtt/li"client_id = f'python-mqtt-{random.randint(0, 1000)}' # 随机生成客户端iddef connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return clientdef publish(client): while True: time.sleep(0.01) msg = json.dumps({"MAC": "0123456789", "samplerate": 12, "sampletime": str(datetime.utcnow().strftime('%Y/%m/%d-%H:%M:%S.%f')[:-3]), "battery": 0.5, "acc": [ [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], ]}) result = client.publish(topic, msg) status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}")def run(): client = connect_mqtt() client.loop_start() publish(client)if __name__ == '__main__': run()
mqtt订阅
from paho.mqtt import client as mqtt_clientimport timeimport osbroker = 'broker.emqx.io'port = 1883topic = "/python/mqtt/li"def connect_mqtt(client_id): """ MQTT 连接函数。 """ def on_connect(client, userdata, flags, rc): """ 连接回调函数 在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。 """ if rc == 0: print("Connected to MQTT Broker! return code %d" % rc) else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) # client.username_pw_set('uname', 'upwd') # 链接mqtt所需的用户名和密码,没有可不写 client.on_connect = on_connect client.connect(broker , port) return clientdef subscribe(client: mqtt_client, a_topic): """ 订阅消息 """ def on_message(client, userdata, msg): """ 消息回调函数 在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。 * 这里可添加自定义数据处理程序 """ print('From topic : %s\n\tmsg : %s' % (msg.topic, msg.payload.decode())) client.subscribe(topic) client.on_message = on_messagedef run(client_id, topic): client = connect_mqtt(client_id) subscribe(client, topic) client.loop_forever()if __name__ == '__main__': run('test_eartag-003-python-li', 'zk100/gw/#')
matplotlib绘制动态图
import matplotlib.pyplot as pltimport numpy as npcount = 100 # 图中最多数据量ax = list(range(count)) # 保存图1数据ay = [0] * 100bx = list(range(count)) # 保存图2数据by = [0] * 100num = count # 计数plt.ion() # 开启一个画图的窗口进入交互模式,用于实时更新数据plt.rcParams['figure.figsize'] = (10, 10) # 图像显示大小plt.rcParams['font.sans-serif'] = ['SimHei'] # 防止中文标签乱码,还有通过导入字体文件的方法plt.rcParams['axes.unicode_minus'] = Falseplt.rcParams['lines.linewidth'] = 0.5 # 设置曲线线条宽度plt.tight_layout()while True: plt.clf() # 清除刷新前的图表,防止数据量过大消耗内存 plt.suptitle("总标题", fontsize=30) # 添加总标题,并设置文字大小 g1 = np.random.random() # 生成随机数画图 # 图表1 ax.append(num) # 追加x坐标值 ay.append(g1) # 追加y坐标值 agraphic = plt.subplot(2, 1, 1) agraphic.set_title('子图表标题1') # 添加子标题 agraphic.set_xlabel('x轴', fontsize=10) # 添加轴标签 agraphic.set_ylabel('y轴', fontsize=20) plt.plot(ax[-count:], ay[-count:], 'g-') # 等于agraghic.plot(ax,ay,'g-') # 图表2 bx.append(num) by.append(g1) bgraghic = plt.subplot(2, 1, 2) bgraghic.set_title('子图表标题2') bgraghic.plot(bx[-count:], by[-count:], 'r^') plt.pause(0.001) # 设置暂停时间,太快图表无法正常显示 num = num + 1
matplotlib绘制mqtt数据实时图像
单线程
先启动mqtt订阅服务
mqtt订阅中有阻塞,更新数据后因订阅服务没有结束,导致绘图程序无法绘图
先启动绘图程序
绘图程序本身也是个循环,拿不到mqtt的实时数据,图像无法更新
两个服务加入协程,也不行。具体原因还不知道,容后补充。
mqtt作为线程启动,可解决上述问题
import jsonimport randomfrom paho.mqtt import client as mqtt_clientimport timeimport datetimefrom math import ceil, floorimport matplotlib.pyplot as pltimport _thread# 公共变量broker = 'broker.emqx.io'topic = "/python/mqtt/li"port = 1883client_id = f'python-mqtt-li-{random.randint(0, 100)}'show_num = 300x_num = [-1] # 计数acc1 = []acc2 = []acc3 = []acc4 = []acc5 = []acc6 = []stime = []"""mqtt subscribe topic"""def str_microsecond_datetime2int_13timestamp(str_microsecond_datetime): """将字符串型【毫秒级】格式化时间 转为 【13位】整型时间戳""" datetime_obj = datetime.datetime.strptime(str_microsecond_datetime, "%Y/%m/%d-%H:%M:%S.%f") obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) / 1000.0 return obj_stampdef int2datetime(int_float_timestamp): """ 有小数点:分离小数点,整数转为格式化时间,小数点直接跟在后面 无小数点:从第10位进行分离, 所以本函数只适用于时间戳整数位数大于9且小于11. """ if '.' in str(int_float_timestamp): int_float = str(int_float_timestamp).split('.') date = time.localtime(int(int_float[0])) tempDate = time.strftime("%Y/%m/%d-%H:%M:%S", date) secondafter = '.' + str(int_float[1]) return str(tempDate) + secondafterdef parse_mqttmsg(msg): """解析mqt头数据 MAC samplerate sampletime battery acc""" content = json.loads(msg.payload.decode()) span = 1000 / content['samplerate'] * 10 time_span = [ceil(span) / 10 / 1000, floor(span) / 10 / 1000] sampletime = content['sampletime'] sampletime_int = str_microsecond_datetime2int_13timestamp(sampletime) acc = content['acc'] for i in range(len(acc)): x_num.append(x_num[-1] + 1) acc1.append(acc[i][0]) acc2.append(acc[i][1]) acc3.append(acc[i][2]) acc4.append(acc[i][3]) acc5.append(acc[i][4]) acc6.append(acc[i][5]) if i != 0: sampletime_int += time_span[i % 2] stime.append(int2datetime(round(sampletime_int * 1000, 0) / 1000)) else: stime.append(sampletime) print(x_num[-1], stime[-1], acc1[-1], acc2[-1], acc3[-1], acc4[-1], acc5[-1], acc6[-1])def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) pass client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return clientdef subscribe(client: mqtt_client): def on_message(client, userdata, msg): # print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") parse_mqttmsg(msg) client.subscribe(topic) client.on_message = on_messagedef run(): client = connect_mqtt() subscribe(client) client.loop_forever()""" draw figures """def draw_figure(): plt.ion() # 开启一个画图的窗口进入交互模式,用于实时更新数据 plt.rcParams['figure.figsize'] = (10, 10) # 图像显示大小 plt.rcParams['font.sans-serif'] = ['SimHei'] # 防止中文标签乱码,还有通过导入字体文件的方法 plt.rcParams['axes.unicode_minus'] = False plt.rcParams['lines.linewidth'] = 0.5 # 设置曲线线条宽度 count = 0 while True: plt.clf() # 清除刷新前的图表,防止数据量过大消耗内存 plt.suptitle("总标题", fontsize=30) # 添加总标题,并设置文字大小 plt.tight_layout() # 图表1 agraphic = plt.subplot(2, 1, 1) agraphic.set_title('子图表标题1') # 添加子标题 agraphic.set_xlabel('x轴', fontsize=10) # 添加轴标签 agraphic.set_ylabel('y轴', fontsize=20) plt.plot(x_num[1:][-show_num:], acc1[-show_num:], 'g-') try: xtricks = list(range(len(acc1) - show_num, len(acc1), 10)) # **1** xlabels = [stime[i] for i in xtricks] # **2** plt.xticks(xtricks, xlabels, rotation=15) except: pass # 图表2 bgraghic = plt.subplot(2, 1, 2) bgraghic.set_title('子图表标题2') bgraghic.set_xlabel('x轴', fontsize=10) # 添加轴标签 bgraghic.set_ylabel('y轴', fontsize=20) bgraghic.plot(x_num[1:][-show_num:], acc2[-show_num:], 'r^') plt.pause(0.001) # 设置暂停时间,太快图表无法正常显示 count = count + 1if __name__ == '__main__': # 多线程 _thread.start_new_thread(run, ()) draw_figure()
感谢各位的阅读,以上就是"Python中如何使用matplotlib绘制mqtt数据实时图像功能"的内容了,经过本文的学习后,相信大家对Python中如何使用matplotlib绘制mqtt数据实时图像功能这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
图表
标题
图像
实时
订阅
函数
时间
标签
功能
大小
客户
客户端
小数
小数点
消息
程序
更新
绘图
内容
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库数据满了
动车组网络技术考试卷
自动接单软件开发
电脑网络安全产品
零盾网络安全
国家网络安全事件应急预案 缺点
河南一彩客互联网科技有限公司
通化网络安全培训
数据库应用系统软件有哪些
腾讯云轻量云服务器登录不了
大数据技术和数据库技术区别
天堂2游戏服务器管理器
网络安全NC连接实验报告
软件开发业务增值税缴纳
联想服务器入围电信
服务器cpu不适合玩游戏吗
天津办公软件开发系统
现代移动网络技术
5g网络技术最厉害的国家
大学生网络安全意识的
网络安全证书哪个部门发
奉贤区进口软件开发价格查询
审计数据库结论怎么写
黔东南州建行金融网络安全
数据库学籍管理系统报告
微星x99 服务器主板
vb.net 内存数据库
Mysql数据库创建余安全
电子政务中的网络安全宣传周
网络安全工作经验材料