异步redis队列实现 数据入库的方法
发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,业务需求app客户端向服务端接口发送来json 数据 每天 发一次 清空缓存后会再次发送出问题之前业务逻辑:php 接口 首先将 json 转为数组 去重 在一张大表中插入不存在的数据该用户已经存在
千家信息网最后更新 2025年01月19日异步redis队列实现 数据入库的方法
业务需求
app客户端向服务端接口发送来json 数据 每天 发一次 清空缓存后会再次发送
出问题之前业务逻辑:
php 接口 首先将 json 转为数组 去重 在一张大表中插入不存在的数据
该用户已经存在 和新增的id
入另一种详情表
问题所在:
当用户因特殊情况清除缓存 导致app 发送json串 入库并发高 导致CPU 暴增到88% 并且居高不下
优化思路:
1、异步队列处理
2、redis 过滤(就是只处理当天第一次请求)
3、redis 辅助存储app名称(验证过后批量插入数据app名称表中)
4、拼接插入的以及新增的如详细表中
解决办法:
1、接口修改 redis 过滤 + 如list队列 并将结果存入redis中
首先 redis将之前的历史数据放在redis 哈希里面 中文为键名 id 为键值
table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName'); // $str = ''; // foreach ($app_name as $key => $val) { // $str.= "{$val} {$key} "; // } // $redis->hmset('app_name', $app_name); // echo $str;exit; $result = $request->input('res'); $list = json_decode($result, true); if (empty ($list) || !is_array($list)) { return json_encode(['result' => 'ERROR', 'msg' => 'parameter error']); } $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ; $data['time'] = date('Y-m-d'); $redis_key = 'log_app:'.$data['time']; //redis 过滤 $redis = Redis::connection('web_active'); //redis 键值过期设置 if (empty($redis->exists($redis_key))) { $redis->hset($redis_key, 1, 'start'); $redis->EXPIREAT($redis_key, strtotime($data['time'].'+2 day')); } //值确定 if ($redis->hexists($redis_key, $data['uid'])) { return json_encode(['result' => 'SUCCESS']); } else { //推入队列 $redis->hset($redis_key, $data['uid'], $result); $redis->rpush('log_app_list', $data['time'] . ':' . $data['uid']); return json_encode(['result' => 'SUCCESS']); } }}
2、php 脚本循环 监控redis 队列 执行逻辑 防止内存溢出
mget 获取该用户的app id 不存在就会返回null
通过判断null 运用redis 新值作为自增id指针 将null 补齐 之后批量入mysql 并跟新redis 哈希 和指针值 并入库 详情表
table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName'); // $redis->hmset('app_name', $app_name); // exit; while(1) { $redis = Redis::connection('web_active'); //队列名称 $res = $redis->lpop('log_app_list'); //开关按钮 $lock = $redis->get('log_app_lock'); if (!empty($res)) { list($date,$uid) = explode(':',$res); $result = $redis->hget('log_app:'.$date, $uid); if (!empty($result)) { $table_name = 'app_total'.date('Ym'); $list = json_decode($result, true); $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ; $data['sex'] = isset($list['sex']) ? $list['sex'] : '' ; $data['device'] = isset($list['device']) ? $list['device'] : '' ; $data['appList'] = isset($list['list']) ? $list['list'] : '' ; //数据去重 flip比unique更节约性能 $data['appList'] = array_flip($data['appList']); $data['appList'] = array_flip($data['appList']); $data['time'] = date('Y-m-d'); //app应用过滤 $app_res = $redis->hmget('app_name', $data['appList']); //新增加app数组 $new_app = []; //mysql 入库数组 $mysql_new_app = []; //获取当前redis 自增指针 $total = $redis->get('app_name_total'); foreach ($app_res as $key =>& $val) { if (is_null($val)) { $total += 1; $new_app[$data['appList'][$key]] = $total; $val = $total; array_push($mysql_new_app,['id' => $total, 'appName'=> $data['appList'][$key]]); } } if (count($new_app)){ $str = "INSERT IGNORE INTO app_set_name (id,appName) values"; foreach ($new_app as $key => $val) { $str.= "(".$val.",'".$key."'),"; } $str = trim($str, ','); //$mysql_res = DB::connection('phpLog')->table('app_set_name')->insert($mysql_new_app); $mysql_res = DB::connection('phpLog')->statement($str); if ($mysql_res) { // 设置redis 指针 $redis->set('app_name_total', $total); // redis 数据入库 $redis->hmset('app_name', $new_app); } } // 详情数据入库 $data['appList'] = implode(',', $app_res); //app统计入库 DB::connection('phpLog')->statement("INSERT IGNORE INTO ".$table_name." (uid,sex,device,`time`,appList) values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['appList']."')"); //log 记录 当文件达到123MB的时候产生内存保错 所有这个地方可是利用日志切割 或者 不写入 日志 Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' success '.$result."\n"); } else { Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' error '.$result."\n"); } } //执行间隔 sleep(1); //结束按钮 if ($lock == 2) { exit; } //内存检测 if(memory_get_usage()>1000*1024*1024){ exit('内存溢出');//大于100M内存退出程序,防止内存泄漏被系统杀死导致任务终端 } } }}
3、执定 定时任务监控脚本执行情况
crontab -e/2 * * * * /bin/bash /usr/local/nginx/html/test.sh 1>>/usr/local/nginx/html/log.log 2>&1
test.sh 内容 (查看执行命令返回的进程id 如果没有就执行命令开启)
#!/bin/bashalive=`ps -ef | grep AppTotal | grep -v grep | awk '{print $2}'`if [ ! $alive ]then /usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &fi
记得授权哦 chmod +x test.sh
笔者用的laravel 框架 将命令激活丢入后台
执行命令
/usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &
完事直接 ctrl -c 结束就行 命令以在后台运行 可以用shell 中的命令查看进程id
这样就实现队列异步入库
还有很多问题需要优化!!大致功能已经实现!!!!!!
优化完成后cpu
以上这篇异步redis队列实现 数据入库的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
数据
队列
内存
命令
指针
名称
数组
用户
详情
问题
业务
任务
内容
后台
就是
情况
按钮
接口
日志
缓存
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发电脑配置规定
戴尔服务器带tesla显卡
DROPBOX下载软件开发
微信小程序服务器代理
哪些游戏不在服务器
福建安卓软件开发公司
如何查看海康服务器资源占用
管道数据库零件号
wow 已从服务器断开
阿里云服务器减少磁盘大小
刘德 网络安全
软件开发师 对英语要求
数据库软件架构CS
网站服务器安全维护价格
电信服务器维护电话
局域网如何制作服务器
计算机网络安全解决方案考虑什么
嵌入式软件开发要会linux吗
复制ic卡扰乱数据库
筹备网络安全工作的是什么部门
代号探戈去主机服务器的路
网络安全简约板报
启动apache服务器
学生管理数据库模板下载
霸屏天下软件开发结构
网址判断服务器位置
数据采集器软件开发
服务器光盘启动在哪里
数据库单值是什么意思
搭建简易数据库