千家信息网

Python探针怎么完成调用库的数据提取

发表于:2025-02-13 作者:千家信息网编辑
千家信息网最后更新 2025年02月13日,今天小编给大家分享一下Python探针怎么完成调用库的数据提取的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下
千家信息网最后更新 2025年02月13日Python探针怎么完成调用库的数据提取

今天小编给大家分享一下Python探针怎么完成调用库的数据提取的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

1.简单粗暴的方法--对mysql库进行封装

要统计一个执行过程, 就需要知道这个执行过程的开始位置和结束位置, 所以最简单粗暴的方法就是基于要调用的方法进行封装,在框架调用MySQL库和MySQL库中间实现一个中间层, 在中间层完成耗时统计,如:

# 伪代码def my_execute(conn, sql, param): # 针对MySql库的统计封装组件 with MyTracer(conn, sql, param):     # 以下为正常使用MySql库的代码with conn.cursor as cursor: cursor.execute(sql, param)...

看样子实现起来非常不错, 而且更改非常方便, 但由于是在最顶层的API上进行修改, 其实是非常不灵活的, 同时在cursor.execute里会进行一些预操作, 如把sql和param进行拼接, 调用nextset清除当前游标的数据等等。我们最后拿到的数据如时间耗时也是不准确的, 同时也没办法得到一些详细的元数据, 如错误码等等.

如果要拿到最直接有用的数据,就只能去改源代码, 然后再调用源代码了, 但是如果每个库都需要改源代码才能统计, 那也太麻烦了, 好在Python也提供了一些类似探针的接口, 可以通过探针把库的源码进行替换完成我们的代码.

2.Python的探针

在Python中可以通过sys.meta_path来实现import hook的功能, 当执行 import 相关操作时, 会根据sys.meta_path定义的对象对import相关库进行更改.sys.meta_path中的对象需要实现一个find_module方法, 这个find_module方法返回None或一个实现了load_module方法的对象, 我们可以通过这个对象, 针对一些库在import时, 把相关的方法进行替换, 简单用法如下,通过hooktime.sleep让他在sleep的时候能打印消耗的时间.

import importlibimport sysfrom functools import wrapsdef func_wrapper(func):    """这里通过一个装饰器来达到狸猫换太子和获取数据的效果"""    @wraps(func)    def wrapper(*args, **kwargs):        # 记录开始时间        start = time.time()        result = func(*args, **kwargs)        # 统计消耗时间        end = time.time()        print(f"speed time:{end - start}")        return result    return wrapperclass MetaPathFinder:    def find_module(self, fullname, path=None):        # 执行时可以看出来在import哪些模块        print(f'find module:{path}:{fullname}')        return MetaPathLoader()class MetaPathLoader:    def load_module(self, fullname):        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import        if fullname in sys.modules:            return sys.modules[fullname]        # 防止递归调用        finder = sys.meta_path.pop(0)        # 导入 module        module = importlib.import_module(fullname)        if fullname == 'time':            # 替换函数            module.sleep = func_wrapper(module.sleep)        sys.meta_path.insert(0, finder)        return modulesys.meta_path.insert(0, MetaPathFinder())if __name__ == '__main__':    import time    time.sleep(1)# 输出示例:# find module:datetime# find module:time# load module:time# find module:math# find module:_datetime# speed time:1.00073385238647468

3.制作探针模块

了解完了主要流程后, 可以开始制作自己的探针模块了, 由于示例只涉及到aiomysql模块, 那么在MetaPathFinder.find_module中需要只对aiomysql模块进行处理, 其他的先忽略. 然后我们需要确定我们要把aiomysql的哪个功能给替换, 从业务上来说, 一般情况下我们只要cursor.execute, cursor.fetchone, cursor.fetchall, cursor.executemany这几个主要的操作,所以需要深入cursor看看如何去更改代码, 后者重载哪个函数.

先cursor.execute的源码(cursor.executemanay也类似), 发现会先调用self.nextset的方法, 把上个请求的数据先拿完, 再合并sql语句, 最后通过self._query进行查询:

async def execute(self, query, args=None):    """Executes the given operation    Executes the given operation substituting any markers with    the given parameters.    For example, getting all rows where id is 5:        cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))    :param query: ``str`` sql statement    :param args: ``tuple`` or ``list`` of arguments for sql query    :returns: ``int``, number of rows that has been produced of affected    """    conn = self._get_db()    while (await self.nextset()):        pass    if args is not None:        query = query % self._escape_args(args, conn)    await self._query(query)    self._executed = query    if self._echo:        logger.info(query)        logger.info("%r", args)    return self._rowcount

再看cursor.fetchone的源码(cursor.fetchall也类似), 发现其实是从缓存中获取数据,

这些数据在执行cursor.execute中就已经获取了:

def fetchone(self):    """Fetch the next row """    self._check_executed()    fut = self._loop.create_future()    if self._rows is None or self._rownumber >= len(self._rows):        fut.set_result(None)        return fut    result = self._rows[self._rownumber]    self._rownumber += 1    fut = self._loop.create_future()    fut.set_result(result)    return fut

综合上面的分析, 我们只要对核心的方法self._query进行重载即可拿到我们要的数据, 从源码中我们可以知道, 我们能获取到传入self._query的self和sql参数, 根据self又能获取到查询的结果, 同时我们通过装饰器能获取到运行的时间, 要的数据基本都到齐了,

按照思路修改后的代码如下:

import importlibimport timeimport sysfrom functools import wrapsfrom typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKINGfrom types import ModuleTypeif TYPE_CHECKING:    import aiomysqldef func_wrapper(func: Callable):    @wraps(func)    async def wrapper(*args, **kwargs) -> Any:        start: float = time.time()        func_result: Any = await func(*args, **kwargs)        end: float = time.time()        # 根据_query可以知道, 第一格参数是self, 第二个参数是sql        self: aiomysql.Cursor = args[0]        sql: str = args[1]        # 通过self,我们可以拿到其他的数据        db: str = self._connection.db        user: str = self._connection.user        host: str = self._connection.host        port: str = self._connection.port        execute_result: Tuple[Tuple] = self._rows        # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了,         # 这里只是打印一部分数据出来        print({            "sql": sql,            "db": db,            "user": user,            "host": host,            "port": port,            "result": execute_result,            "speed time": end - start        })        return func_result    return cast(Callable, wrapper)class MetaPathFinder:    @staticmethod    def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:        if fullname == 'aiomysql':            # 只有aiomysql才进行hook            return MetaPathLoader()        else:            return Noneclass MetaPathLoader:    @staticmethod    def load_module(fullname: str):        if fullname in sys.modules:            return sys.modules[fullname]        # 防止递归调用        finder: "MetaPathFinder" = sys.meta_path.pop(0)        # 导入 module        module: ModuleType = importlib.import_module(fullname)        # 针对_query进行hook        module.Cursor._query = func_wrapper(module.Cursor._query)        sys.meta_path.insert(0, finder)        return moduleasync def test_mysql() -> None:    import aiomysql    pool: aiomysql.Pool = await aiomysql.create_pool(        host='127.0.0.1', port=3306, user='root', password='123123', db='mysql'    )    async with pool.acquire() as conn:        async with conn.cursor() as cur:            await cur.execute("SELECT 42;")            (r,) = await cur.fetchone()            assert r == 42    pool.close()    await pool.wait_closed()if __name__ == '__main__':    sys.meta_path.insert(0, MetaPathFinder())    import asyncio    asyncio.run(test_mysql())# 输出示例:# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}

这个例子看来很不错, 但是需要在调用的入口处显式调用该逻辑, 通常一个项目可能有几个入口, 每个入口都显示调用该逻辑会非常麻烦, 而且必须先调用我们的hook逻辑后才能import, 这样就得订好引入规范, 不然就可能出现部分地方hook不成功, 如果能把引入hook这个逻辑安排在解析器启动后马上执行, 就可以完美地解决这个问题了. 查阅了一翻资料后发现,python解释器初始化的时候会自动import PYTHONPATH下存在的sitecustomize和usercustomize模块, 我们只要创建该模块, 并在模块里面写入我们的 替换函数即可。

.├── __init__.py├── hook_aiomysql.py├── sitecustomize.py└── test_auto_hook.py

hook_aiomysql.py是我们制作探针的代码为例子, 而sitecustomize.py存放的代码如下, 非常简单, 就是引入我们的探针代码, 并插入到sys.meta_path:

import sysfrom hook_aiomysql import MetaPathFindersys.meta_path.insert(0, MetaPathFinder())

test_auto_hook.py则是测试代码:

import asynciofrom hook_aiomysql import test_mysqlasyncio.run(test_mysql())

接下来只要设置PYTHONPATH并运行我们的代码即可(如果是项目的话一般交由superisor启动,则可以在配置文件中设置好PYTHONPATH):

(.venv) ➜  python_hook git:(master) ✗ export PYTHONPATH=.      (.venv) ➜  python_hook git:(master) ✗ python test_auto_hook.py {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}

4.直接替换方法

可以看到上面的方法很好的运行了, 而且可以很方便的嵌入到我们的项目中, 但是依赖与sitecustomize.py文件很难让他抽离成一个第三方的库, 如果要抽离成第三方的库就得考虑看看有没有其他的方法。在上面介绍MetaPathLoader时说到了sys.module, 在里面通过sys.modules来减少重复引入:

class MetaPathLoader:    def load_module(self, fullname):        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import        if fullname in sys.modules:            return sys.modules[fullname]        # 防止递归调用        finder = sys.meta_path.pop(0)        # 导入 module        module = importlib.import_module(fullname)        if fullname == 'time':            # 替换函数            module.sleep = func_wrapper(module.sleep)        sys.meta_path.insert(0, finder)        return module

这个减少重复引入的原理是, 每次引入一个模块后, 他就会存放在sys.modules, 如果是重复引入, 就会直接刷新成最新引入的模块。上面之所以会考虑到减少重复import是因为我们不会在程序运行时升级第三方库的依赖。利用到我们可以不考虑重复引入同名不同实现的模块, 以及sys.modules会缓存引入模块的特点, 我们可以把上面的逻辑简化成引入模块->替换当前模块方法为我们修改的hook方法。

import timefrom functools import wrapsfrom typing import Any, Callable, Tuple, castimport aiomysqldef func_wrapper(func: Callable):    """和上面一样的封装函数, 这里简单略过"""# 判断是否hook过_IS_HOOK: bool = False# 存放原来的_query_query: Callable = aiomysql.Cursor._query# hook函数def install_hook() -> None:    _IS_HOOK = False    if _IS_HOOK:        return    aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query)    _IS_HOOK = True# 还原到原来的函数方法def reset_hook() -> None:    aiomysql.Cursor._query = _query    _IS_HOOK = False

代码简单明了,接下来跑一跑刚才的测试:

import asyncioimport aiomysqlfrom demo import install_hook, reset_hookasync def test_mysql() -> None:    pool: aiomysql.Pool = await aiomysql.create_pool(        host='127.0.0.1', port=3306, user='root', password='', db='mysql'    )    async with pool.acquire() as conn:        async with conn.cursor() as cur:            await cur.execute("SELECT 42;")            (r,) = await cur.fetchone()            assert r == 42    pool.close()    await pool.wait_closed()print("install hook")install_hook()asyncio.run(test_mysql())print("reset hook")reset_hook()asyncio.run(test_mysql())print("end")

通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息

install hook{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875}reset hookend

以上就是"Python探针怎么完成调用库的数据提取"这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注行业资讯频道。

数据 模块 方法 代码 探针 函数 逻辑 时间 统计 运行 参数 对象 源码 知识 篇文章 面的 封装 入口 可以通过 同时 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全法实施条例全文解释 数据库的记录数 幻塔为什么有时候连接不上服务器 2020国家网络安全日视频 赤峰市软件开发公司 服务器影响无盘系统的输入吗 数据库至少包含2个重做 火锅店怎么卖服务器 全职高手网络安全大赛 本地用户通过公网地址访问服务器 职业中专学校软件开发是什么 数据库索引性别男女 移动网络安全 教学计划 网络技术中心部是干嘛的 cmd 运行数据库文件 天翼网络安全科技有限公司 上海计算机网络技术人员高级技师 河南数据网络技术资费 计算机网络技术学习什么语言 数据库专升本录取查询系统 大脚插件数据库搜索铁矿 数据库至少包含2个重做 计算机网络技术大专好学吗 网络安全教育新闻武警 数据库基础案例教程 数据库应急响应恢复 怎么连接到sql数据库 小峰总科数据库 汕头支付软件开发订制 如何查看软件开发者信息
0