千家信息网

python3多进程和协程处理MySQL数据讲义

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,下文内容主要给大家带来python3多进程和协程处理MySQL数据讲义,这里所讲到的知识,与书籍略有不同,都是专业技术人员在与用户接触过程中,总结出来的,具有一定的经验分享价值,希望给广大读者带来帮助
千家信息网最后更新 2025年01月22日python3多进程和协程处理MySQL数据讲义

下文内容主要给大家带来python3多进程和协程处理MySQL数据讲义,这里所讲到的知识,与书籍略有不同,都是专业技术人员在与用户接触过程中,总结出来的,具有一定的经验分享价值,希望给广大读者带来帮助。

python3的多进程 + 协程处理MySQL的数据,主要逻辑是拉取MySQL的数据,然后使用flashtext匹配关键字,在存回MySQL,代码如下(async_mysql.py):

import timeimport asyncioimport randomfrom concurrent.futures import ProcessPoolExecutor as Poolimport aiomysqlfrom flashtext import KeywordProcessorimport clickclass AttrDict(dict):    """可以用"."获取属性,没有该属性时返回None的字典"""    def __getattr__(self, name):        try:            return self[name]        except KeyError:            return None    def __setattr__(self, name, value):        self[name] = valueclass AttrDictCursor(aiomysql.DictCursor):    """继承aiomysql的字典cursor"""    dict_type = AttrDictclass MultiProcessMysql(object):    """用多进程和协程处理MySQL数据"""    def __init__(self, workers=2, pool=10, start=0, end=2000):        """第一段的参数需要跟随需求变动"""        self.host = "192.168.0.34"        self.port = 3306        self.user = "root"        self.password = "root"        self.db = "mydb"        self.origin_table = "judgment_main_etl"  # main        self.dest_table = "laws_finance1"        self.s_sql = f"select uuid, court_idea, judge_result, reason, plt_claim, dft_rep, crs_exm from {self.origin_table} where %s<=id and id<%s;"        self.i_sql = f"insert into {self.dest_table} (uuid, title, reason, keyword) values (%s, %s, %s, %s)"        self.pool = pool    # 协程数和MySQL连接数        self.aionum = self.pool        self.step = 2000  # 一次性从MySQL拉取的行数        self.workers = workers  # 进程数        self.start = start  # MySQL开始的行数        self.end = end  # MySQL结束的行数        self.keyword = ['非法经营支付业务', '网络洗钱', '资金池', '支付牌照', '清洁算', '网络支付', '网上支付', '移动支付', '聚合支付', '保本保息', '担保交易', '供应链金融', '网贷', '网络借贷', '网络投资', '虚假标的', '自融', '资金池', '关联交易', '庞氏骗局', '网络金融理财', '线上投资理财', '互联网私募', '互联网股权', '非法集资', '合同欺诈', '众筹投资', '股权转让', '互联网债权转让', '资本自融', '投资骗局', '洗钱', '非法集资', '网络传销', '虚拟币泡沫', '网络互助金融', '金融欺诈', '网上银行', '信用卡盗刷', '网络钓鱼', '信用卡信息窃取', '网上洗钱', '洗钱诈骗', '数字签名更改', '支付命令窃取', '金融诈骗', '引诱投资', '隐瞒项目信息', '风险披露', '夸大收益', '诈骗保险金', '非法经营保险业务', '侵占客户资金', '征信报告窃取', '金融诈骗', '破坏金融管理']        self.kp = KeywordProcessor()    # flashtext是一个文本匹配包,在关键词数量大时速度远大于re        self.kp.add_keywords_from_list(self.keyword)    async def createMysqlPool(self, loop):        """每个进程要有独立的pool,所以不绑定self"""        pool = await aiomysql.create_pool(            loop=loop, host=self.host, port=self.port, user=self.user,            password=self.password, db=self.db, maxsize=self.pool,            charset='utf8', cursorclass=AttrDictCursor        )        return pool    def cutRange(self, start, end, times):        """将数据区间分段"""        partition = (end - start) // times        ranges = []        tmp_end = start        while tmp_end < end:            tmp_end += partition            # 剩下的不足以再分            if (end - tmp_end) < partition:                tmp_end = end            ranges.append((start, tmp_end))            start = tmp_end        return ranges    async def findKeyword(self, db, start, end):        """从MySQL数据中匹配出关键字"""        # 随机休息一定时间,防止数据同时到达,同时处理, 应该是一部分等待,一部分处理        await asyncio.sleep(random.random() * self.workers * 2)        print("coroutine start")        async with db.acquire() as conn:            async with conn.cursor() as cur:                while start < end:                    tmp_end = start + self.step                    if tmp_end > end:                        tmp_end = end                    print("aio start: %s, end: %s" % (start, tmp_end))                    # <=id 和 id<                    await cur.execute(self.s_sql, (start, tmp_end))                    datas = await cur.fetchall()                    uuids = []                    for data in datas:                        if data:                            for key in list(data.keys()):                                if not data[key]:                                    data.pop(key)                            keyword = self.kp.extract_keywords(                                " ".join(data.values()))                            if keyword:                                keyword = ' '.join(set(keyword))   # 对关键字去重                                # print(keyword)                                uuids.append(                                    (data.uuid, data.title, data.reason, keyword))                    await cur.executemany(self.i_sql, uuids)                    await conn.commit()                    start = tmp_end    def singleProcess(self, start, end):        """单个进程的任务"""        loop = asyncio.get_event_loop()        # 为每个进程创建一个pool        db = loop.run_until_complete(asyncio.ensure_future(            self.createMysqlPool(loop)))        tasks = []        ranges = self.cutRange(start, end, self.aionum)        print(ranges)        for start, end in ranges:            tasks.append(self.findKeyword(db, start, end))        loop.run_until_complete(asyncio.gather(*tasks))    def run(self):        """多进程跑"""        tasks = []        ranges = self.cutRange(self.start, self.end, self.workers)        start_time = time.time()        with Pool(max_workers=self.workers) as executor:            for start, end in ranges:                print("processor start: %s, end: %s" % (start, end))                tasks.append(executor.submit(self.singleProcess, start, end))            for task in tasks:                task.result()        print("total time: %s" % (time.time() - start_time))@click.command(help="运行")@click.option("-w", "--workers", default=2, help="进程数")@click.option('-p', "--pool", default=10, help="协程数")@click.option('-s', '--start', default=0, help='MySQL开始的id')@click.option('-e', "--end", default=2640000, help="MySQL结束的id")def main(workers, pool, start, end):    mp = MultiProcessMysql(workers=workers, pool=pool, start=start, end=end)    if workers * pool > 100:        if not click.confirm('MySQL连接数超过100(%s),确认吗?' % (workers * pool)):            return    mp.run()if __name__ == "__main__":    main()

运行如下:
$ python3 async_mysql.py -w 2 # 可以指定其他参数,也可使用默认值

对于以上关于python3多进程和协程处理MySQL数据讲义,如果大家还有更多需要了解的可以持续关注我们的行业推新,如需获取专业解答,可在官网联系售前售后的,希望该文章可给大家带来一定的知识更新。

0