千家信息网

使用celery怎么实现集群管理

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,本篇文章给大家分享的是有关使用celery怎么实现集群管理,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。架构:这里作为例子的celery
千家信息网最后更新 2025年02月05日使用celery怎么实现集群管理

本篇文章给大家分享的是有关使用celery怎么实现集群管理,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

架构:

这里作为例子的celery app为myapp:

root@workgroup0:~/celeryapp# ls myappagent.py   celery.py   config.py   __init__.pyroot@workgroup0:~/celeryapp#

公用代码部分:

celery.py:(备注:172.16.77.175是任务发布节点的ip地址)

from __future__ import absolute_importfrom celery import Celeryapp = Celery('myapp',             broker='amqp://guest@172.16.77.175//',             backend='amqp://guest@172.16.77.175//',             include=['myapp.agent'])app.config_from_object('myapp.config')if __name__ == '__main__':  app.start()

config.py:

from __future__ import absolute_importfrom kombu import Queue,Exchangefrom datetime import timedeltaCELERY_TASK_RESULT_EXPIRES=3600CELERY_TASK_SERIALIZER='json'CELERY_ACCEPT_CONTENT=['json']CELERY_RESULT_SERIALIZER='json'CELERY_DEFAULT_EXCHANGE = 'agent'CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'CELERT_QUEUES =  (  Queue('machine1',exchange='agent',routing_key='machine1'),  Queue('machine2',exchange='agent',routing_key='machine2'),)

__init__.py:(空白)

任务发布节点的agent.py:

from __future__ import absolute_importfrom myapp.celery import app@app.taskdef add(x,y):    return {'the value is ':str(x+y)}@app.taskdef writefile():    out=open('/tmp/data.txt','w')    out.write('hello'+'\n')    out.close()@app.taskdef mul(x,y):    return x*y@app.taskdef xsum(numbers):    return sum(numbers)@app.taskdef getl(stri):    return getlength(stri)def getlength(stri):    return len(stri)

docker1上的agent.py:

from __future__ import absolute_importfrom myapp.celery import app@app.taskdef add(x,y):    return {'value':str(x+y),'node_name':'docker1'}                   #增加了node_name用来识别节点@app.taskdef writefile():    out=open('/tmp/data.txt','w')    out.write('hello'+'\n')    out.close()@app.taskdef mul(x,y):    return x*y@app.taskdef xsum(numbers):    return sum(numbers)@app.taskdef getl(stri):    return getlength(stri)def getlength(stri):    return len(stri)

docker2上的:

from __future__ import absolute_importfrom myapp.celery import app@app.taskdef add(x,y):    return {'value':str(x+y),'node_name':'docker2'}@app.taskdef writefile():    out=open('/tmp/data.txt','w')    out.write('hello'+'\n')    out.close()@app.taskdef mul(x,y):    return x*y@app.taskdef xsum(numbers):    return sum(numbers)@app.taskdef getl(stri):    return getlength(stri)def getlength(stri):    return len(stri)

在这个例子中我只测试add()函数:

在docker1节点上启动worker:(用-Q指定监听的queue)

root@workgroup1:~/celeryapp# celery -A myapp worker -l info -Q machine1/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which isabsolutely not recommended!Please specify a different user using the -u option.User information: uid=0 euid=0 gid=0 egid=0  uid=uid, euid=euid, gid=gid, egid=egid,  -------------- celery@workgroup1.hzg.com v3.1.17 (Cipater)---- **** ----- --- * ***  * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty-- * - **** --- - ** ---------- [config]- ** ---------- .> app:         myapp:0x7f472d73f190- ** ---------- .> transport:   amqp://guest:**@172.16.77.175:5672//- ** ---------- .> results:     amqp://guest@172.16.77.175//- *** --- * --- .> concurrency: 1 (prefork)-- ******* ---- --- ***** ----- [queues] -------------- .> machine1         exchange=machine1(direct) key=machine1                [tasks]  . myapp.agent.add  . myapp.agent.getl  . myapp.agent.mul  . myapp.agent.writefile  . myapp.agent.xsum[2015-10-18 15:07:51,313: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672//[2015-10-18 15:07:51,340: INFO/MainProcess] mingle: searching for neighbors[2015-10-18 15:07:52,372: INFO/MainProcess] mingle: sync with 1 nodes[2015-10-18 15:07:52,374: INFO/MainProcess] mingle: sync complete[2015-10-18 15:07:52,423: WARNING/MainProcess] celery@workgroup1.hzg.com ready.

启动docker2上的worker:

root@workgroup2:~/celeryapp# celery -A myapp worker -l info -Q machine2/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which isabsolutely not recommended!Please specify a different user using the -u option.User information: uid=0 euid=0 gid=0 egid=0  uid=uid, euid=euid, gid=gid, egid=egid,  -------------- celery@workgroup2.hzg.com v3.1.18 (Cipater)---- **** ----- --- * ***  * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty-- * - **** --- - ** ---------- [config]- ** ---------- .> app:         myapp:0x7f708cb8ec10- ** ---------- .> transport:   amqp://guest:**@172.16.77.175:5672//- ** ---------- .> results:     amqp://guest@172.16.77.175//- *** --- * --- .> concurrency: 1 (prefork)-- ******* ---- --- ***** ----- [queues] -------------- .> machine2         exchange=machine2(direct) key=machine2                [tasks]  . myapp.agent.add  . myapp.agent.getl  . myapp.agent.mul  . myapp.agent.writefile  . myapp.agent.xsum[2015-10-18 15:08:52,114: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672//[2015-10-18 15:08:52,144: INFO/MainProcess] mingle: searching for neighbors[2015-10-18 15:08:53,174: INFO/MainProcess] mingle: sync with 1 nodes[2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete[2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.

在任务发布节点发布一个计算任务给docker1:

root@workgroup0:~/celeryapp# lsdefault.etcd  hots.sh  hotswap.py  myapp  myapp1tmp  people.db  resp  sora  test.pyroot@workgroup0:~/celeryapp# pythonPython 2.7.6 (default, Mar 22 2014, 22:59:56) [GCC 4.8.2] on linux2Type "help", "copyright", "credits" or "license" for more information.>>> from myapp.agent import add>>> res = add.apply_async(args=[122,34],queue='machine1',routing_key='machine1')>>> res.get(){u'value': u'156', u'node_name': u'docker1'}

用get()可以看到来自docker1的返回,再看看docker1的显示:

[2015-10-18 15:11:51,217: INFO/MainProcess] Task myapp.agent.add[c487a9a2-e5cc-462b-a131-784b363a1952] succeeded in 0.03602907s: {'value': '156', 'node_name': 'docker1'}

至于docker2,一点没动:

[2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete[2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.

发布一个任务给docker2:

>>> res = add.apply_async(args=[1440,900],queue='machine2',routing_key='machine2')>>> res.get(){u'value': u'2340', u'node_name': u'docker2'}>>>

以上就是使用celery怎么实现集群管理,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

0