千家信息网

Python如何实现获取微信企业号access_token的Class

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,小编给大家分享一下Python如何实现获取微信企业号access_token的Class,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去
千家信息网最后更新 2025年01月22日Python如何实现获取微信企业号access_token的Class

小编给大家分享一下Python如何实现获取微信企业号access_token的Class,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。为兼容这两种方式,因此按照订阅号的方式处理。

处理办法与接口文档中的要求相同:

为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。

下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。

设计思路和使用方法:

  1. 自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据

  2. 尽可能的保证Class中每一个可执行的函数单独调用都能成功。

  3. Class中只将真正能被用到的方法和变量设置为public的。

  4. 使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使用WeiXinTokenClass().get()方法即可得到access_token。

脚本内容可以从github上获取,地址:https://github.com/DingGuodong/LinuxBashShellScriptForOps/blob/master/projects/WeChatOps/OpsDevBestPractice/odbp_getToken.py

脚本内容如下:

#!/usr/bin/python# encoding: utf-8# -*- coding: utf8 -*-"""Created by PyCharm.File:               LinuxBashShellScriptForOps:odbp_getToken.pyUser:               GuodongCreate Date:        2016/8/10Create Time:        17:04 """import osimport sqlite3import sysimport urllibimport urllib2import jsonimport datetime# import timeenable_debug = Truedef debug(msg, code=None):    if enable_debug:        if code is None:            print "message: %s" % msg        else:            print "message: %s, code: %s " % (msg, code)AUTHOR_MAIL = "uberurey_ups@163.com"weixin_qy_CorpID = "your_corpid"weixin_qy_Secret = "your_secret"# Build paths inside the project like this: os.path.join(BASE_DIR, ...)BASE_DIR = os.path.dirname(os.path.abspath(__file__))# Database# https://docs.djangoproject.com/en/1.9/ref/settings/#databasesDATABASES = {    'default': {        'ENGINE': 'db.backends.sqlite3',        'NAME': os.path.join(BASE_DIR, '.odbp_db.sqlite3'),    }}sqlite3_db_file = str(DATABASES['default']['NAME'])def sqlite3_conn(database):    try:        conn = sqlite3.connect(database)    except sqlite3.Error:        print >> sys.stderr, """\    There was a problem connecting to Database:        %s    The error leading to this problem was:        %s    It's possible that this database is broken or permission denied.    If you cannot solve this problem yourself, please mail to:        %s    """ % (database, sys.exc_value, AUTHOR_MAIL)        sys.exit(1)    else:        return conndef sqlite3_commit(conn):    return conn.commit()def sqlite3_close(conn):    return conn.close()def sqlite3_execute(database, sql):    try:        sql_conn = sqlite3_conn(database)        sql_cursor = sql_conn.cursor()        sql_cursor.execute(sql)        sql_conn.commit()        sql_conn.close()    except sqlite3.Error as e:        print e        sys.exit(1)def sqlite3_create_table_token():    sql_conn = sqlite3_conn(sqlite3_db_file)    sql_cursor = sql_conn.cursor()    sql_cursor.execute('''CREATE TABLE "main"."weixin_token" (                "id"  INTEGER ,                "access_token"  TEXT,                "expires_in"  TEXT,                "expires_on"  TEXT,                "is_expired"  INTEGER                )                ;    ''')    sqlite3_commit(sql_conn)    sqlite3_close(sql_conn)def sqlite3_create_table_account():    sql_conn = sqlite3_conn(sqlite3_db_file)    sql_cursor = sql_conn.cursor()    sql_cursor.execute('''CREATE TABLE "main"."weixin_account" (                "id"  INTEGER,                "name"  TEXT,                "corpid"  TEXT,                "secret"  TEXT,                "current"  INTEGER                )                ;    ''')    sqlite3_commit(sql_conn)    sqlite3_close(sql_conn)def sqlite3_create_tables():    print "sqlite3_create_tables"    sql_conn = sqlite3_conn(sqlite3_db_file)    sql_cursor = sql_conn.cursor()    sql_cursor.execute('''CREATE TABLE "main"."weixin_token" (                "id"  INTEGER ,                "access_token"  TEXT,                "expires_in"  TEXT,                "expires_on"  TEXT,                "is_expired"  INTEGER                )                ;    ''')    sql_cursor.execute('''CREATE TABLE "main"."weixin_account" (                "id"  INTEGER,                "name"  TEXT,                "corpid"  TEXT,                "secret"  TEXT,                "current"  INTEGER                )                ;    ''')    sqlite3_commit(sql_conn)    sqlite3_close(sql_conn)def sqlite3_set_credential(corpid, secret):    try:        sql_conn = sqlite3_conn(sqlite3_db_file)        sql_cursor = sql_conn.cursor()        sql_cursor.execute('''INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES                                (1,                                'odbp',                                ?,                                ?,                                1)''', (corpid, secret))        sqlite3_commit(sql_conn)        sqlite3_close(sql_conn)    except sqlite3.Error:        sqlite3_create_table_account()        sqlite3_set_credential(corpid, secret)def sqlite3_set_token(access_token, expires_in, expires_on, is_expired):    try:        sql_conn = sqlite3_conn(sqlite3_db_file)        sql_cursor = sql_conn.cursor()        sql_cursor.execute('''INSERT INTO "weixin_token"                              ("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES                              (                              1,                              ?,                              ?,                              ?,                              ?                              )''', (access_token, expires_in, expires_on, is_expired))        sqlite3_commit(sql_conn)        sqlite3_close(sql_conn)    except sqlite3.Error:        sqlite3_create_table_token()        sqlite3_set_token(access_token, expires_in, expires_on, is_expired)def sqlite3_get_credential():    try:        sql_conn = sqlite3_conn(sqlite3_db_file)        sql_cursor = sql_conn.cursor()        credential = sql_cursor.execute('''SELECT "corpid", "secret"  FROM weixin_account WHERE current == 1;''')        result = credential.fetchall()        sqlite3_close(sql_conn)    except sqlite3.Error:        sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret)        return sqlite3_get_credential()    else:        if result is not None and len(result) != 0:            return result        else:            print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL            sys.exit(1)def sqlite3_get_token():    try:        sql_conn = sqlite3_conn(sqlite3_db_file)        sql_cursor = sql_conn.cursor()        credential = sql_cursor.execute(            '''SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;''')        result = credential.fetchall()        sqlite3_close(sql_conn)    except sqlite3.Error:        info = sys.exc_info()        print info[0], ":", info[1]    else:        if result is not None and len(result) != 0:            return result        else:            # print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL            # sys.exit(1)            return Nonedef sqlite3_update_token(access_token, expires_on):    sql_conn = sqlite3_conn(sqlite3_db_file)    sql_cursor = sql_conn.cursor()    sql_cursor.execute('''UPDATE "weixin_token" SET                          access_token=?,                          expires_on=?                          WHERE _ROWID_ = 1;''', (access_token, expires_on)                       )    sqlite3_commit(sql_conn)    sqlite3_close(sql_conn)class WeiXinTokenClass(object):    def __init__(self):        self.__corpid = None        self.__corpsecret = None        self.__use_persistence = True        self.__access_token = None        self.__expires_in = None        self.__expires_on = None        self.__is_expired = None        if self.__use_persistence:            self.__corpid = sqlite3_get_credential()[0][0]            self.__corpsecret = sqlite3_get_credential()[0][1]        else:            self.__corpid = weixin_qy_CorpID            self.__corpsecret = weixin_qy_Secret    def __get_token_from_weixin_qy_api(self):        parameters = {            "corpid": self.__corpid,            "corpsecret": self.__corpsecret        }        url_parameters = urllib.urlencode(parameters)        token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?"        url = token_url + url_parameters        response = urllib2.urlopen(url)        result = response.read()        token_json = json.loads(result)        if token_json['access_token'] is not None:            get_time_now = datetime.datetime.now()            # TODO(Guodong Ding) token will expired ahead of time or not expired after the time            expire_time = get_time_now + datetime.timedelta(seconds=token_json['expires_in'])            token_json['expires_on'] = str(expire_time)            self.__access_token = token_json['access_token']            self.__expires_in = token_json['expires_in']            self.__expires_on = token_json['expires_on']            self.__is_expired = 1            try:                token_result_set = sqlite3_get_token()            except sqlite3.Error:                token_result_set = None            if token_result_set is None and len(token_result_set) == 0:                sqlite3_set_token(self.__access_token, self.__expires_in, self.__expires_on, self.__is_expired)            else:                if self.__is_token_expired() is True:                    sqlite3_update_token(self.__access_token, self.__expires_on)                else:                    debug("pass")                    return        else:            if token_json['errcode'] is not None:                print "errcode is: %s" % token_json['errcode']                print "errmsg is: %s" % token_json['errmsg']            else:                print result    def __get_token_from_persistence_storage(self):        try:            token_result_set = sqlite3_get_token()        except sqlite3.Error:            self.__get_token_from_weixin_qy_api()        finally:            if token_result_set is None:                self.__get_token_from_weixin_qy_api()                token_result_set = sqlite3_get_token()                access_token = token_result_set[0][0]                expire_time = token_result_set[0][1]            else:                access_token = token_result_set[0][0]                expire_time = token_result_set[0][1]        expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f')        now_time = datetime.datetime.now()        if now_time < expire_time:            # print "The token is %s" % access_token            # print "The token will expire on %s" % expire_time            return access_token        else:            self.__get_token_from_weixin_qy_api()            return self.__get_token_from_persistence_storage()    @staticmethod    def __is_token_expired():        try:            token_result_set = sqlite3_get_token()        except sqlite3.Error as e:            print e            sys.exit(1)        expire_time = token_result_set[0][1]        expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f')        now_time = datetime.datetime.now()        if now_time < expire_time:            return False        else:            return True    def get(self):        return self.__get_token_from_persistence_storage()

以上是"Python如何实现获取微信企业号access_token的Class"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0