celery
特点:简单、高可用性、快速、灵活
任务队列是一种在线程或者机器间分发任务的机制.
- 安装方式:
pip install -U Celery
- 使用
Redis
作为中间人: - 配置:
BROKER_URL = ‘redis://localhost:6379/0’
- URL 格式:
redis://password@hostname:port/db_number
- 可见性超时:
BROKEN_TRANSPORT_OPTIONS={'visibility_timeout': 3600}
#visibility_timeout
必须是整数,单位是秒
redis 的默认可见性超时时间是 1 小时
在 Redis 中存储任务的状态和返回值
CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0’
广播信息默认为所有主机可见,设置传输选项给消息加上前缀,这样就只能被活动的虚拟主机收到
BROKER_TRANSPORT_OPTIONS = {‘fanout_prefix’: True}
Redis 在某些情况会从数据库中驱除键
如果你遇到了类似这样的错误:
InconsistencyError, Probably the key ('_kombu.binding.celery') has been
removed from the Redis database.
你可以配置 Redis 服务器的 timeout 参数为 0 来避免键被驱逐。
Celery 是一个异步任务队列。你可以使用它在你的应用上下文之外执行任务。总的想法就是你的应用程序可能需要执行任何消耗资源的任务都可以交给任务队列,让你的应用程序自由和快速地响应客户端请求。
使用 Celery 运行后台任务并不像在线程中这样做那么简单。但是好处多多,Celery 具有分布式架构,使你的应用易于扩展。一个 Celery 安装有三个核心组件:
Celery 客户端: 用于发布后台作业。 Celery workers: 这些是运行后台作业的进程。Celery 支持本地和远程的 workers 消息代理: 客户端通过消息队列和 workers 进行通信,Celery 支持多种方式来实现这些队列。最常用的代理就是 RabbitMQ 和 Redis
- tasks.py
# !/usr/bin/env python
# -*- coding: utf-8 -*-
"""
在当前目录下执行 celery -A tasks worker -B --loglevel=info
"""
from celery import Celery
import time
app = Celery('tasks')
app.config_from_object('config')
@app.task
def add(x, y):
result = x + y
now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print "\n hjd now time: %s, %s + %s = %s\n" % (now_time, x, y, result)
return result
- config.py
# redis作为代理服务器
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
BROKER_URL = 'redis://127.0.0.1:6379/0'
- 执行方式,在 tasks.py 同目录下执行下面语句
celery -A tasks worker --loglevel=info
在tasks.py同目录下打开python命令行
from tasks import add
res = add.delay(4, 3)
# delay() 和 apply_async() 的返回值是一个表示任务的对象,这个对象可以用于获取任务状态。我将会在本文的后面展示如何获取任务状态等信息
flower
celery flower -A tasks:celery -l info --port=8080 --address=0.0.0.0 --url_prefix=""
发送停止信号
kill -TERM `ps axu | grep celery | grep -v grep | awk '{print $2}'`