千家信息网

Python中怎么获取OneNet数据

发表于:2024-10-28 作者:千家信息网编辑
千家信息网最后更新 2024年10月28日,Python中怎么获取OneNet数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。使用python脚本文件对OneNet服务器数据进行
千家信息网最后更新 2024年10月28日Python中怎么获取OneNet数据

Python中怎么获取OneNet数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

使用python脚本文件对OneNet服务器数据进行上传、获取的操作

上传数据

#!/usr/bin/env python# -*- coding: utf-8 -*-# @Time    : 2021/2/24 23:01# @Author  : LiShan# @Email   : lishan_1997@126.com# @File    : OneNet_Post.py# @Note    : https://blog.csdn.net/lishan132/article/details/114044902import urllib.requestimport json# 设备ID、KeydeviceId = "591972034"APIKey = "ROsfgvwqy2jxn2x93TdDCCbFmL8="# 上传函数def OneNet_post_data(info):    url = "https://api.heclouds.com/devices/" + deviceId + '/datapoints'    streams = []    for index, element in enumerate(info):        streams.append({"id": element[0], "datapoints": [{"value": element[1]}]})    values = {"datastreams": streams}    data = json.dumps(values).encode("utf-8")    request = urllib.request.Request(url, data)    request.add_header('api-key', APIKey)    request.get_method = lambda: 'POST'    request = urllib.request.urlopen(request)    print(json.loads(request.read()))if __name__ == '__main__':    upload_data = [        ["road1", 10],        ["road2", 20],        ["road3", 30],        ["road4", 40],        ["road5", 50],        ["road6", 60],        ["road7", 70],        ["road8", 80],        ["road9", 90],        ["road10", 100],        ["road11", 110],        ["road12", 120],    ]    OneNet_post_data(upload_data)

获取数据

#!/usr/bin/env python# -*- coding: utf-8 -*-# @Time    : 2021/2/24 23:01# @Author  : LiShan# @Email   : lishan_1997@126.com# @File    : OneNet_Get.py# @Note    : https://blog.csdn.net/lishan132/article/details/114044902import urllib.requestimport json# 设备ID、KeydeviceId = "591972034"APIKey = "ROsfgvwqy2jxn2x93TdDCCbFmL8="# 获取函数def OneNet_get_data():    url = "http://api.heclouds.com/devices/" + deviceId + "/datastreams"    request = urllib.request.Request(url)    request.add_header('api-key', APIKey)    request.get_method = lambda: 'GET'    request = urllib.request.urlopen(request)    r = json.loads(request.read())    data = r.pop('data')    print(r)    return dataif __name__ == '__main__':    load_data = OneNet_get_data()    print('参数' + '\t\t\t\t\t' + '更新时间' + '\t\t\t\t\t\t' + '数值')    for index, element in enumerate(load_data):        a = str(element.get('update_at', ''))        b = str(element.get('current_value', ''))        if a != "" and b != "":            print(str(element['id']) + '\t\t\t\t' + a + '\t\t\t' + b)

整合为一个文件

#!/usr/bin/env python# -*- coding: utf-8 -*-# pip install prettytableimport urllib.request as reqimport json# 设备ID、KeydeviceId = "591972034"APIKey = "ROsfgvwqy2jxn2x93TdDCCbFmL8="# 上传函数def OneNet_post_data(info):    url = "https://api.heclouds.com/devices/" + info[0] + '/datapoints'    headers = {'api-key': info[1]}    streams = []    for i, e in enumerate(info[2]):        streams.append({"id": e[0], "datapoints": [{"value": e[1]}]})    data = json.dumps({"datastreams": streams}).encode("utf-8")    request = json.loads((req.urlopen(req.Request(url, data, headers=headers))).read())    print(request)# 获取函数def OneNet_get_data(info):    url = "http://api.heclouds.com/devices/" + info[0] + "/datastreams"    headers = {'api-key': info[1]}    request = json.loads((req.urlopen(req.Request(url, headers=headers))).read())    data = request.pop('data')    print(request)    return dataif __name__ == '__main__':    # 准备待上传数据    upload_data = [        ["road1", 10],        ["road2", 20],        ["road3", 30],        ["road4", 40],        ["road5", 50],        ["road6", 60],        ["road7", 70],        ["road8", 80],        ["road9", 90],        ["road10", 100],        ["road11", 110],        ["road12", 120],    ]    # 上传数据    OneNet_post_data([deviceId, APIKey, upload_data])    # 获取数据    get_data = OneNet_get_data([deviceId, APIKey])    # noinspection PyBroadException    try:        # 使用表格美化显示数据        import prettytable as pt        tb = pt.PrettyTable()        tb.field_names = ["id", "update_at", "current_value"]        for index, element in enumerate(get_data):            ID = str(element.get('id', ''))            update_at = str(element.get('update_at', ''))            current_value = str(element.get('current_value', ''))            tb.add_row([ID, update_at, current_value])        print(tb)    except Exception:        # 直接显示数据        for index, element in enumerate(get_data):            ID = str(element.get('id', ''))            update_at = str(element.get('update_at', ''))            current_value = str(element.get('current_value', ''))            print(ID, update_at, current_value)

看完上述内容,你们掌握Python中怎么获取OneNet数据的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0