python3多进程和协程处理MySQL数据讲义
发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,下文内容主要给大家带来python3多进程和协程处理MySQL数据讲义,这里所讲到的知识,与书籍略有不同,都是专业技术人员在与用户接触过程中,总结出来的,具有一定的经验分享价值,希望给广大读者带来帮助
千家信息网最后更新 2025年01月22日python3多进程和协程处理MySQL数据讲义
下文内容主要给大家带来python3多进程和协程处理MySQL数据讲义,这里所讲到的知识,与书籍略有不同,都是专业技术人员在与用户接触过程中,总结出来的,具有一定的经验分享价值,希望给广大读者带来帮助。
python3的多进程 + 协程处理MySQL的数据,主要逻辑是拉取MySQL的数据,然后使用flashtext匹配关键字,在存回MySQL,代码如下(async_mysql.py
):
import timeimport asyncioimport randomfrom concurrent.futures import ProcessPoolExecutor as Poolimport aiomysqlfrom flashtext import KeywordProcessorimport clickclass AttrDict(dict): """可以用"."获取属性,没有该属性时返回None的字典""" def __getattr__(self, name): try: return self[name] except KeyError: return None def __setattr__(self, name, value): self[name] = valueclass AttrDictCursor(aiomysql.DictCursor): """继承aiomysql的字典cursor""" dict_type = AttrDictclass MultiProcessMysql(object): """用多进程和协程处理MySQL数据""" def __init__(self, workers=2, pool=10, start=0, end=2000): """第一段的参数需要跟随需求变动""" self.host = "192.168.0.34" self.port = 3306 self.user = "root" self.password = "root" self.db = "mydb" self.origin_table = "judgment_main_etl" # main self.dest_table = "laws_finance1" self.s_sql = f"select uuid, court_idea, judge_result, reason, plt_claim, dft_rep, crs_exm from {self.origin_table} where %s<=id and id<%s;" self.i_sql = f"insert into {self.dest_table} (uuid, title, reason, keyword) values (%s, %s, %s, %s)" self.pool = pool # 协程数和MySQL连接数 self.aionum = self.pool self.step = 2000 # 一次性从MySQL拉取的行数 self.workers = workers # 进程数 self.start = start # MySQL开始的行数 self.end = end # MySQL结束的行数 self.keyword = ['非法经营支付业务', '网络洗钱', '资金池', '支付牌照', '清洁算', '网络支付', '网上支付', '移动支付', '聚合支付', '保本保息', '担保交易', '供应链金融', '网贷', '网络借贷', '网络投资', '虚假标的', '自融', '资金池', '关联交易', '庞氏骗局', '网络金融理财', '线上投资理财', '互联网私募', '互联网股权', '非法集资', '合同欺诈', '众筹投资', '股权转让', '互联网债权转让', '资本自融', '投资骗局', '洗钱', '非法集资', '网络传销', '虚拟币泡沫', '网络互助金融', '金融欺诈', '网上银行', '信用卡盗刷', '网络钓鱼', '信用卡信息窃取', '网上洗钱', '洗钱诈骗', '数字签名更改', '支付命令窃取', '金融诈骗', '引诱投资', '隐瞒项目信息', '风险披露', '夸大收益', '诈骗保险金', '非法经营保险业务', '侵占客户资金', '征信报告窃取', '金融诈骗', '破坏金融管理'] self.kp = KeywordProcessor() # flashtext是一个文本匹配包,在关键词数量大时速度远大于re self.kp.add_keywords_from_list(self.keyword) async def createMysqlPool(self, loop): """每个进程要有独立的pool,所以不绑定self""" pool = await aiomysql.create_pool( loop=loop, host=self.host, port=self.port, user=self.user, password=self.password, db=self.db, maxsize=self.pool, charset='utf8', cursorclass=AttrDictCursor ) return pool def cutRange(self, start, end, times): """将数据区间分段""" partition = (end - start) // times ranges = [] tmp_end = start while tmp_end < end: tmp_end += partition # 剩下的不足以再分 if (end - tmp_end) < partition: tmp_end = end ranges.append((start, tmp_end)) start = tmp_end return ranges async def findKeyword(self, db, start, end): """从MySQL数据中匹配出关键字""" # 随机休息一定时间,防止数据同时到达,同时处理, 应该是一部分等待,一部分处理 await asyncio.sleep(random.random() * self.workers * 2) print("coroutine start") async with db.acquire() as conn: async with conn.cursor() as cur: while start < end: tmp_end = start + self.step if tmp_end > end: tmp_end = end print("aio start: %s, end: %s" % (start, tmp_end)) # <=id 和 id< await cur.execute(self.s_sql, (start, tmp_end)) datas = await cur.fetchall() uuids = [] for data in datas: if data: for key in list(data.keys()): if not data[key]: data.pop(key) keyword = self.kp.extract_keywords( " ".join(data.values())) if keyword: keyword = ' '.join(set(keyword)) # 对关键字去重 # print(keyword) uuids.append( (data.uuid, data.title, data.reason, keyword)) await cur.executemany(self.i_sql, uuids) await conn.commit() start = tmp_end def singleProcess(self, start, end): """单个进程的任务""" loop = asyncio.get_event_loop() # 为每个进程创建一个pool db = loop.run_until_complete(asyncio.ensure_future( self.createMysqlPool(loop))) tasks = [] ranges = self.cutRange(start, end, self.aionum) print(ranges) for start, end in ranges: tasks.append(self.findKeyword(db, start, end)) loop.run_until_complete(asyncio.gather(*tasks)) def run(self): """多进程跑""" tasks = [] ranges = self.cutRange(self.start, self.end, self.workers) start_time = time.time() with Pool(max_workers=self.workers) as executor: for start, end in ranges: print("processor start: %s, end: %s" % (start, end)) tasks.append(executor.submit(self.singleProcess, start, end)) for task in tasks: task.result() print("total time: %s" % (time.time() - start_time))@click.command(help="运行")@click.option("-w", "--workers", default=2, help="进程数")@click.option('-p', "--pool", default=10, help="协程数")@click.option('-s', '--start', default=0, help='MySQL开始的id')@click.option('-e', "--end", default=2640000, help="MySQL结束的id")def main(workers, pool, start, end): mp = MultiProcessMysql(workers=workers, pool=pool, start=start, end=end) if workers * pool > 100: if not click.confirm('MySQL连接数超过100(%s),确认吗?' % (workers * pool)): return mp.run()if __name__ == "__main__": main()
运行如下:$ python3 async_mysql.py -w 2 # 可以指定其他参数,也可使用默认值
对于以上关于python3多进程和协程处理MySQL数据讲义,如果大家还有更多需要了解的可以持续关注我们的行业推新,如需获取专业解答,可在官网联系售前售后的,希望该文章可给大家带来一定的知识更新。
数据
网络
金融
支付
进程
处理
投资
诈骗
互联网
关键
资金
互联
讲义
专业
业务
信息
信用
信用卡
关键字
参数
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
郑州最好网络安全公司有哪些
营山软件开发系统
北京服务器哪个好
我该怎么选择数据库
网络安全艺术字手写体
随州交警网络安全
海康威视的应用软件开发
太原戴尔服务器
怎么进入西瓜视频专属服务器
学软件开发就业前景怎么样
sql与数据库入门基础知识
连云港品质联想服务器供应商
上海驾御互联网科技有限公司
云服务器免费vps
软件开发公司的税收
网络技术运用的例子
大数据技术是不是数据库
甘肃软件开发有限公司
学网络安全报什么专业
华为服务器的操作系统是什么
魔兽小服务器什么贵
app软件开发上线
服务器网卡一般用bond几
.net软件开发年度总结
1 软件开发(委托)合同
575服务器防护网站
杭州西奥服务器如何改成中文
软考软件开发中级试题结构
满洲里市各学校开展网络安全宣传
dnf台服灰服务器