使用celery怎么实现集群管理
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,本篇文章给大家分享的是有关使用celery怎么实现集群管理,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。架构:这里作为例子的celery
千家信息网最后更新 2025年02月05日使用celery怎么实现集群管理
本篇文章给大家分享的是有关使用celery怎么实现集群管理,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
架构:
这里作为例子的celery app为myapp:
root@workgroup0:~/celeryapp# ls myappagent.py celery.py config.py __init__.pyroot@workgroup0:~/celeryapp#
公用代码部分:
celery.py:(备注:172.16.77.175是任务发布节点的ip地址)
from __future__ import absolute_importfrom celery import Celeryapp = Celery('myapp', broker='amqp://guest@172.16.77.175//', backend='amqp://guest@172.16.77.175//', include=['myapp.agent'])app.config_from_object('myapp.config')if __name__ == '__main__': app.start()
config.py:
from __future__ import absolute_importfrom kombu import Queue,Exchangefrom datetime import timedeltaCELERY_TASK_RESULT_EXPIRES=3600CELERY_TASK_SERIALIZER='json'CELERY_ACCEPT_CONTENT=['json']CELERY_RESULT_SERIALIZER='json'CELERY_DEFAULT_EXCHANGE = 'agent'CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'CELERT_QUEUES = ( Queue('machine1',exchange='agent',routing_key='machine1'), Queue('machine2',exchange='agent',routing_key='machine2'),)
__init__.py:(空白)
任务发布节点的agent.py:
from __future__ import absolute_importfrom myapp.celery import app@app.taskdef add(x,y): return {'the value is ':str(x+y)}@app.taskdef writefile(): out=open('/tmp/data.txt','w') out.write('hello'+'\n') out.close()@app.taskdef mul(x,y): return x*y@app.taskdef xsum(numbers): return sum(numbers)@app.taskdef getl(stri): return getlength(stri)def getlength(stri): return len(stri)
docker1上的agent.py:
from __future__ import absolute_importfrom myapp.celery import app@app.taskdef add(x,y): return {'value':str(x+y),'node_name':'docker1'} #增加了node_name用来识别节点@app.taskdef writefile(): out=open('/tmp/data.txt','w') out.write('hello'+'\n') out.close()@app.taskdef mul(x,y): return x*y@app.taskdef xsum(numbers): return sum(numbers)@app.taskdef getl(stri): return getlength(stri)def getlength(stri): return len(stri)
docker2上的:
from __future__ import absolute_importfrom myapp.celery import app@app.taskdef add(x,y): return {'value':str(x+y),'node_name':'docker2'}@app.taskdef writefile(): out=open('/tmp/data.txt','w') out.write('hello'+'\n') out.close()@app.taskdef mul(x,y): return x*y@app.taskdef xsum(numbers): return sum(numbers)@app.taskdef getl(stri): return getlength(stri)def getlength(stri): return len(stri)
在这个例子中我只测试add()函数:
在docker1节点上启动worker:(用-Q指定监听的queue)
root@workgroup1:~/celeryapp# celery -A myapp worker -l info -Q machine1/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which isabsolutely not recommended!Please specify a different user using the -u option.User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@workgroup1.hzg.com v3.1.17 (Cipater)---- **** ----- --- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty-- * - **** --- - ** ---------- [config]- ** ---------- .> app: myapp:0x7f472d73f190- ** ---------- .> transport: amqp://guest:**@172.16.77.175:5672//- ** ---------- .> results: amqp://guest@172.16.77.175//- *** --- * --- .> concurrency: 1 (prefork)-- ******* ---- --- ***** ----- [queues] -------------- .> machine1 exchange=machine1(direct) key=machine1 [tasks] . myapp.agent.add . myapp.agent.getl . myapp.agent.mul . myapp.agent.writefile . myapp.agent.xsum[2015-10-18 15:07:51,313: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672//[2015-10-18 15:07:51,340: INFO/MainProcess] mingle: searching for neighbors[2015-10-18 15:07:52,372: INFO/MainProcess] mingle: sync with 1 nodes[2015-10-18 15:07:52,374: INFO/MainProcess] mingle: sync complete[2015-10-18 15:07:52,423: WARNING/MainProcess] celery@workgroup1.hzg.com ready.
启动docker2上的worker:
root@workgroup2:~/celeryapp# celery -A myapp worker -l info -Q machine2/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which isabsolutely not recommended!Please specify a different user using the -u option.User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@workgroup2.hzg.com v3.1.18 (Cipater)---- **** ----- --- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty-- * - **** --- - ** ---------- [config]- ** ---------- .> app: myapp:0x7f708cb8ec10- ** ---------- .> transport: amqp://guest:**@172.16.77.175:5672//- ** ---------- .> results: amqp://guest@172.16.77.175//- *** --- * --- .> concurrency: 1 (prefork)-- ******* ---- --- ***** ----- [queues] -------------- .> machine2 exchange=machine2(direct) key=machine2 [tasks] . myapp.agent.add . myapp.agent.getl . myapp.agent.mul . myapp.agent.writefile . myapp.agent.xsum[2015-10-18 15:08:52,114: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672//[2015-10-18 15:08:52,144: INFO/MainProcess] mingle: searching for neighbors[2015-10-18 15:08:53,174: INFO/MainProcess] mingle: sync with 1 nodes[2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete[2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.
在任务发布节点发布一个计算任务给docker1:
root@workgroup0:~/celeryapp# lsdefault.etcd hots.sh hotswap.py myapp myapp1tmp people.db resp sora test.pyroot@workgroup0:~/celeryapp# pythonPython 2.7.6 (default, Mar 22 2014, 22:59:56) [GCC 4.8.2] on linux2Type "help", "copyright", "credits" or "license" for more information.>>> from myapp.agent import add>>> res = add.apply_async(args=[122,34],queue='machine1',routing_key='machine1')>>> res.get(){u'value': u'156', u'node_name': u'docker1'}
用get()可以看到来自docker1的返回,再看看docker1的显示:
[2015-10-18 15:11:51,217: INFO/MainProcess] Task myapp.agent.add[c487a9a2-e5cc-462b-a131-784b363a1952] succeeded in 0.03602907s: {'value': '156', 'node_name': 'docker1'}
至于docker2,一点没动:
[2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete[2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.
发布一个任务给docker2:
>>> res = add.apply_async(args=[1440,900],queue='machine2',routing_key='machine2')>>> res.get(){u'value': u'2340', u'node_name': u'docker2'}>>>
以上就是使用celery怎么实现集群管理,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
任务
节点
集群
管理
例子
更多
知识
篇文章
部分
实用
代码
函数
地址
备注
就是
工作会
文章
架构
看吧
知识点
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
长沙中信软件开发公司吗
天水海力网络技术有限公司
web软件开发原型工具有哪些
万国觉醒服务器战力排名
腾讯云服务器在北京还是上海好
服务器所有网站404
nba2k15 服务器
北京瑞奥风网络技术中心
神湾租房软件开发
电视台做软件开发
服务器如何给别人管理员
编辑框数据保存数据库
摄像头检测数据库
筑牢网络安全防线试题
常熟综合网络技术包括什么
学籍客户端 更新数据库异常
中控考勤数据库
28岁改行学软件开发
大连软件开发外企
ug里的国标模型数据库
80年代网络数据库
sql数据库服务器密码
nba2k15 服务器
展厅2台服务器做多通道融合
sql数据库并发处理能力
利用mql创建一个数据库
四六级报名未连接上服务器
红河软件开发学费
数据库下载官网
网络安全面临哪些巨大挑战