python 操作 redis + 消息队列使用例子
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,操作 redisimport redisredisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8)redis= r
千家信息网最后更新 2025年01月24日python 操作 redis + 消息队列使用例子
操作 redis
import redisredisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8)redis= redis.Redis(connection_pool=redisPool)redis.set('key','values')redis.get('com')redis.append('keys','values')redis.delete('keys')print(redis.getset('name','Mike')) #赋值name为Mike并返回上一次的valueprint(redis.mget(['name','age'])) #输出name键和age键的valueprint(redis.setnx('newname','james')) #如果键值不存在,则赋值print(redis.mset({'name1':'smith','name2':'curry'})) #批量赋值print(redis.msetnx({'name3':'ltf','name4':'lsq'})) #不存在才批量赋值print(redis.incr('age',1)) #age对应的value 加1print(redis.decr('age',5)) #age对应的value 减5print(redis.append('name4','is a sb')) #在name4的value后追加 is a sb 返回字符串长度print(redis.substr('name',1,4)) #截取键 nameprint(redis.sadd('tags','Book','Tea','Coffee')) #返回集合长度 3print(redis.srem('tags','Book')) #返回删除的数据个数print(redis.spop('tags')) #随机删除并返回该元素print(redis.smove('tags','tags1','Coffee'))print(redis.scard('tags')) # 获取tags集合的元素个数print(redis.sismember('tags', 'Book')) # 判断Book是否在tags的集合中print(redis.sinter('tags', 'tags1')) # 返回集合tags和集合tags1的交集print(redis.sunion('tags', 'tags1')) # 返回集合tags和集合tags1的并集print(redis.sdiff('tags', 'tags1')) # 返回集合tags和集合tags1的差集print(redis.smembers('tags')) # 返回集合tags的所有元素print(redis.hset('price','cake',5)) # 向键名为price的散列表添加映射关系,返回1 即添加的映射个数print(redis.hsetnx('price','book',6)) # 向键名为price的散列表添加映射关系,返回1 即添加的映射个数print(redis.hget('price', 'cake')) # 获取键名为cake的值 返回5print(redis.hmset('price',{'banana':2,'apple':3,'pear':6,'orange':7})) #批量添加映射print(redis.hmget('price', ['apple', 'orange'])) # 查询apple和orange的值 输出 b'3',b'7'print(redis.hincrby('price','apple',3)) #apple映射加3 为6print(redis.hexists('price', 'banana')) # 在price中banana是否存在 返回Trueprint(redis.hdel('price','banana')) #从price中删除banana 返回1print(redis.hlen('price')) # 输出price的长度print(redis.hkeys('price')) # 输出所有的映射键名print(redis.hvals('price')) # 输出所有的映射键值print(redis.hgetall('price')) # 输出所有的映射键对print(redis.rpush('list',1,2,3)) #向键名为list的列表尾部添加1,2,3 返回长度print(redis.lpush('list',0)) #向键名为list的列表头部添加0 返回长度print(redis.llen('list')) #返回列表的长度print(redis.lrange('list',1,3)) #返回起始索引为1 终止索引为3的索引范围对应的列表print(redis.lindex('list',1)) #返回索引为1的元素-valueprint(redis.lset('list',1,5)) #将list的列表索引为1的重新赋值为5print(redis.lpop('list')) #删除list第一个元素print(redis.rpop('list')) #删除list最后一个元素print(redis.blpop('list')) #删除list第一个元素print(redis.brpop('list')) #删除最后一个元素print(redis.rpoplpush('list','list1')) #删除list的尾元素并将其添加到list1的头部
消息队列使用例子
import redisimport jsonredisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8)client = redis.Redis(connection_pool=redisPool)# 顺序插入五条数据到redis队列,sort参数是用来验证弹出的顺序while True: num = 0 for i in range(0, 100): num = num + 1 # params info params_dict = {"name": f"test {num}", "sort":num} client.rpush("test", json.dumps(params_dict)) # 查看目标队列数据 result = client.lrange("test", 0, 100) print(result) import time time.sleep(10)
import redisimport timeimport multiprocessingimport timeimport osimport randomredisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8)client = redis.Redis(connection_pool=redisPool)def test1(msg): t_start = time.time() print("%s开始执行,进程号为%d" % (msg, os.getpid())) time.sleep(random.random() * 2) t_stop = time.time() print("%s执行完成,耗时%.2f" % (msg, t_stop - t_start))while True: number = client.llen('test') print("现在的队列任务 条数是 ", number) p = 100 if number > p-1: print("-----start-----") a = [] for i in range(p): result = client.lpop("test") a.append(result) print("每10条读取一次", a) po = multiprocessing.Pool(p) for i in range(0, p): # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,)) # 每次循环将会用空闲出来的子进程去调用目标 po.apply_async(test1, (a[i],)) po.close() # 关闭进程池,关闭后po不再接收新的请求 po.join() # 等待po中所有子进程执行完成,必须放在close语句之后 print("-----end-----") time.sleep(2) elif number < p and number > 0: print("-----start-----") a = [] for i in range(number): a = [] result = client.lpop("test") a.append(result) print("小于10条的 读取一次 ", a) po = multiprocessing.Pool(number) for i in a: # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,)) # 每次循环将会用空闲出来的子进程去调用目标 po.apply_async(test1, (a,)) po.close() # 关闭进程池,关闭后po不再接收新的请求 po.join() # 等待po中所有子进程执行完成,必须放在close语句之后 print("-----end-----") time.sleep(2) elif number == 0: print("没有任务需要处理") time.sleep(2) else: time.sleep(2)
元素
目标
进程
长度
输出
索引
队列
个数
参数
数据
任务
头部
空闲
语句
顺序
元祖
循环
例子
消息
交集
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
通过串口管理服务器
中国的代理服务器
数据库浅谈
国际网络安全中国
上海对思网络技术有限公司
超聚变服务器认证
网络安全的威胁
软件开发类型ppt模板
中国影晌力数据库梁文
交通行业数据库脱敏技术原理
北京华为鲲鹏服务器批发供应
无锡工业软件开发公司有哪些
江宁区方便软件开发售后服务
网络技术竞争
服务器安装在内存条上
2016 网络安全周
博弈软件开发
苏州网络安全公告
陕西配电服务器机柜云主机
java从数据库级联
远程计算机提示未启用服务器
网络技术资料销售平台
数据库备份和还原功能
如何看软件开发项目管理职位
网络安全监测装置部署要求
网络安全域的作用
服务器电池经常坏
做软件开发企业需要交什么税
辽宁葫芦岛软件开发
香港服务器流量怎么调