← 返回并发编程目录
← 返回学习笔记首页
专题: Python并发编程系统学习
关键词: Python, 并发编程, Celery, 任务队列, Broker, worker, 分布式任务, 异步
一、Celery简介
什么是Celery
Celery是一个基于Python开发的分布式任务队列框架,专注于处理实时任务和定时调度任务。它采用生产者-消费者模式,将任务的创建与执行分离,允许将耗时的同步操作转化为异步任务,从而提升系统的吞吐量和响应速度。
适用场景
耗时任务: 发送邮件、生成报表、图片处理、视频转码等需要长时间执行的操作
定时任务: 数据备份、日志清理、每日统计、监控告警等周期性执行的工作
异步消息处理: Web请求中不需要立即返回结果的操作,如用户注册后的欢迎邮件
任务解耦: 将复杂业务拆分为多个独立任务,提高系统的可维护性和扩展性
架构三要素
Celery的架构由三个核心组件组成,理解这三个组件是掌握Celery的基础。
Broker(消息代理): 负责接收生产者发送的任务消息并将其存储到队列中,供Worker消费。常见的Broker有RabbitMQ、Redis、Amazon SQS等。
Worker(工作进程): 实际执行任务的消费者进程。Worker从Broker中拉取任务消息并执行,执行完成后将结果存入Backend。
Backend(结果后端): 用于存储任务执行完成后的返回结果。通过配置result backend,可以在任务执行后获取返回值或异常信息。常见的Backend有Redis、数据库、Elasticsearch等。
核心思想: Celery将任务的定义 、分发 和执行 三者解耦,使开发者可以专注于业务逻辑,而无需关心底层的并发和通信细节。
二、快速开始
安装Celery
使用pip安装Celery库,根据选择的Broker安装对应的依赖。
# 安装Celery核心库
pip install celery
# 使用Redis作为Broker时
pip install celery[redis]
# 使用RabbitMQ作为Broker时
pip install celery[rabbitmq]
定义第一个任务
创建一个名为tasks.py的文件,定义Celery应用和第一个任务。
from celery import Celery
# 创建Celery应用实例
# broker参数指定消息代理的地址
app = Celery('tasks' , broker='redis://localhost:6379/0' )
@app.task
def add (x, y):
return x + y
启动Worker并调用任务
在终端中启动Worker进程,然后在Python中调用任务。
# 终端启动Worker(需在tasks.py所在目录执行)
celery -A tasks worker --loglevel=info
# 在Python交互环境中调用任务
from tasks import add
result = add.delay(4 , 5 )
print (result.id) # 输出任务ID,用于后续查询结果
注意: delay()方法是apply_async()的简写形式,两者效果相同。delay()不支持传递执行参数,而apply_async()可以指定countdown、eta、queue等高级参数。
三、Broker配置
RabbitMQ vs Redis对比
Broker是Celery的核心依赖,选择合适的Broker对生产环境至关重要。
特性
RabbitMQ
Redis
消息可靠性
高(支持消息确认、持久化)
中(可能存在数据丢失风险)
性能
高,适合大规模消息
高,适合轻量级场景
功能丰富度
丰富(交换机、路由、死信队列)
基础(列表实现队列)
运维复杂度
较高(需管理Erlang环境)
低(Redis通用性高)
推荐场景
生产环境、对可靠性要求高的场景
开发测试、小型项目
连接池配置
生产环境中需要合理配置连接池参数,避免连接耗尽或资源浪费。
# 使用Redis作为Broker时配置连接池
app.conf.broker_url = 'redis://:password@localhost:6379/0'
app.conf.broker_pool_limit = 10 # 连接池最大连接数
app.conf.broker_connection_retry_on_startup = True
app.conf.broker_connection_max_retries = 5
消息持久化
确保Broker重启后任务消息不丢失,需要开启消息持久化功能。
# RabbitMQ中使用持久化队列
app.conf.task_default_delivery_mode = 2 # 2=persistent, 1=non-persistent
# Redis中默认不持久化,可通过配置开启
app.conf.broker_transport_options = {
'visibility_timeout' : 3600 , # 超时时间(秒)
'max_retries' : 3 ,
}
四、调用任务与结果获取
调用方式
Celery提供了多种调用任务的方式,满足不同的业务场景。
delay(): 最简单的调用方式,直接传入参数,立即异步执行
apply_async(): 更灵活的调用方式,支持设置执行参数
send_task(): 通过任务名称调用,适用于跨服务调用场景
# delay方式(简写)
result = add.delay(4 , 5 )
# apply_async方式(完整参数)
result = add.apply_async(
args=(4 , 5 ),
countdown=10 , # 延迟10秒执行
expires=3600 , # 任务过期时间(秒)
queue='high_priority' , # 指定队列
task_id='custom-id' # 自定义任务ID
)
结果获取(Result Backend)
配置result backend后,可以通过AsyncResult获取任务的执行结果。
from celery.result import AsyncResult
# 配置result backend
app.conf.result_backend = 'redis://localhost:6379/1'
# 定义任务
@app.task
def long_task (n):
import time
time.sleep(n)
return f"任务执行完毕,耗时{n}秒"
# 调用并获取结果
result = long_task.delay(5 )
task_id = result.id
# 在其他地方查询结果
async_result = AsyncResult(task_id, app=app)
if async_result.ready():
print (async_result.get()) # 获取结果(阻塞)
print (async_result.status) # 任务状态: SUCCESS/FAILURE/PENDING
print (async_result.successful()) # 是否成功
最佳实践: get()方法是阻塞调用,用于需要等待结果的场景。在Web应用中应避免直接调用get(),而是采用异步轮询或WebSocket推送方式获取结果。
任务状态跟踪
Celery任务的生命周期包含多个状态,了解这些状态有助于调试和监控。
PENDING: 任务等待执行
RECEIVED: Worker已接收任务
STARTED: 任务开始执行
SUCCESS: 任务执行成功
FAILURE: 任务执行失败
RETRY: 任务即将重试
REVOKED: 任务已被撤销
五、任务路由与队列
多队列分离不同任务
在生产环境中,通常需要将不同类型的任务分发到不同的队列,以便进行资源隔离和优先级管理。
# 配置任务路由
app.conf.task_routes = {
'tasks.send_email' : {'queue' : 'email' },
'tasks.process_image' : {'queue' : 'image' },
'tasks.generate_report' : {'queue' : 'report' },
}
# 也可以通过装饰器直接指定
@app.task(queue='email')
def send_welcome_email (user_email):
pass
启动Worker时绑定指定队列
不同的Worker进程可以监听不同的队列,实现资源隔离。
# 启动只处理email队列的Worker
celery -A tasks worker -Q email --loglevel=info
# 启动处理image和report队列的Worker
celery -A tasks worker -Q image,report --loglevel=info
# Worker默认监听名为"celery"的队列
celery -A tasks worker --loglevel=info
任务优先级
通过设置优先级,确保重要的任务得到优先处理。
# 定义时设置优先级(数值越小优先级越高)
result = add.apply_async(
args=(4 , 5 ),
priority=1 # 高优先级
)
# 低优先级任务
result = add.apply_async(
args=(1 , 2 ),
priority=10 # 低优先级
)
设计原则: 建议将不同类型、不同重要性的任务路由到不同的队列,并为每个队列启动独立的Worker进程。这样即使某个队列的任务大量积压,也不会影响其他队列的正常处理。
六、定时任务Celery Beat
什么是Celery Beat
Celery Beat是一个定时任务调度器,它周期性地将任务发送到Broker,由Worker执行。Beat和Worker可以运行在同一个进程中,也可以分离运行。
Crontab调度
使用crontab表达式可以精确控制任务的执行时间,语法与Linux crontab一致。
from celery.schedules import crontab
app.conf.beat_schedule = {
'daily-report' : {
'task' : 'tasks.generate_daily_report' ,
'schedule' : crontab(hour=8 , minute=0 ), # 每天8:00执行
'args' : ('daily' ,),
},
'weekly-cleanup' : {
'task' : 'tasks.cleanup_temp_files' ,
'schedule' : crontab(
hour=3 , minute=0 ,
day_of_week='monday' # 每周一3:00执行
),
},
}
Interval定时
对于需要固定间隔执行的任务,使用interval调度更加直观。
from celery.schedules import timedelta
app.conf.beat_schedule = {
'health-check' : {
'task' : 'tasks.health_check' ,
'schedule' : timedelta(seconds=30 ), # 每30秒执行一次
},
'sync-data' : {
'task' : 'tasks.sync_data' ,
'schedule' : timedelta(minutes=15 ), # 每15分钟执行一次
},
}
启动Beat
启动Beat调度器需要单独运行beat进程。
# 启动Beat调度器
celery -A tasks beat --loglevel=info
# 同时启动Worker和Beat(开发环境常用)
celery -A tasks worker --beat --loglevel=info
动态添加定时任务
在运行时通过数据库动态管理定时任务,适合需要频繁调整调度策略的场景。
# 使用django-celery-beat扩展实现动态管理
from django_celery_beat.models import PeriodicTask, CrontabSchedule
# 创建crontab调度
schedule, _ = CrontabSchedule.objects.get_or_create(
hour=9 , minute=0 ,
)
# 创建定时任务
PeriodicTask.objects.create(
name='send-morning-email' ,
task='tasks.send_morning_email' ,
crontab=schedule,
args='[]' ,
)
七、工作流(Canvas原语)
什么是Canvas原语
Celery提供了强大的工作流原语,用于将多个任务组合成复杂的执行流程。这是Celery区别于其他任务队列的核心特性之一。
chain链式调用
将多个任务串联执行,前一个任务的输出作为后一个任务的输入。
from celery import chain
# 定义任务
@app.task
def process_order (order_id):
return {'order_id' : order_id, 'status' : 'processed' }
@app.task
def send_notification (order_data):
print (f"通知: 订单{order_data['order_id']}已处理" )
return order_data
# 链式执行:先处理订单,再发送通知
result = chain(process_order.s(1001 ), send_notification.s())()
# 等价于:result = (process_order.s(1001) | send_notification.s())()
group并行调用
将多个任务并行执行,所有任务完成后返回结果列表。
from celery import group
@app.task
def fetch_page (url):
import requests
return {'url' : url, 'length' : len (requests.get(url).text)}
# 并行抓取多个页面
urls = ['http://example.com' , 'http://python.org' , 'http://celeryproject.org' ]
result = group(fetch_page.s(url) for url in urls)()
print (result.get()) # 等待所有任务完成后获取结果列表
chord回调
chord是group的增强版,在所有并行任务完成后执行一个回调任务。
from celery import chord
@app.task
def sum_results (results):
return sum (results)
# 先并行计算,再汇总结果
result = chord(
[add.s(i, i) for i in range (10 )], # 并行计算0+0, 1+1, ..., 9+9
body=sum_results.s() # 计算完成后汇总
)()
print (result.get()) # 输出90
signature签名
Signature是任务的"签名",将任务及其参数封装为可传递的对象,是构建工作流的基础。
# 使用.s()方法创建签名
sig1 = add.s(2 , 3 ) # 部分参数
sig2 = add.s(2 ) # 只有一个参数,后续通过pipe传入
sig3 = add.s(2 , 3 , 4 ) # 错误的用法,add只接受两个参数
# 使用signature类
from celery import signature
task_sig = signature('tasks.add' , args=(2 , 3 ), countdown=10 )
result = task_sig.delay()
工作流选择指南: 需要对多个结果进行聚合时用group+chord;任务有明确的先后依赖关系时用chain;需要动态组合任务时使用signature。合理的任务编排可以大幅提升系统的处理效率。
八、监控与管理
Flower监控面板
Flower是基于Web的Celery监控工具,提供实时的Worker状态、任务历史、队列深度等信息。
# 安装Flower
pip install flower
# 启动Flower监控
celery -A tasks flower --port=5555
# 启用认证
celery -A tasks flower --port=5555 \
--basic_auth=admin:password123
启动后访问 http://localhost:5555 即可查看监控面板,包括任务成功率、执行时间分布、Worker负载等关键指标。
Worker状态查看
通过Celery命令行工具可以实时查看Worker的状态和统计信息。
# 查看所有活跃的Worker
celery -A tasks status
# 查看Worker的详细信息
celery -A tasks inspect active
celery -A tasks inspect reserved
celery -A tasks inspect registered
# 查看所有队列中的消息数量
celery -A tasks inspect active_queues
任务重试与过期
生产环境中任务可能因各种原因失败,合理的重试策略和任务过期机制至关重要。
# 定义自动重试的任务
@app.task (
bind=True ,
max_retries=3 ,
default_retry_delay=60 , # 首次重试间隔(秒)
autoretry_for=(Exception,), # 遇到异常自动重试
)
def unreliable_task (self):
import random
if random.random() < 0.7 :
raise ConnectionError("网络连接失败" )
return "成功"
# 手动重试(指数退避)
@app.task (bind=True , max_retries=5 )
def api_call (self, url):
try :
return call_external_api(url)
except Exception as e:
countdown = 2 ** self.request.retries # 指数退避:2, 4, 8, 16, 32秒
raise self.retry(exc=e, countdown=countdown)
生产环境部署最佳实践
进程管理: 使用Supervisor或systemd管理Worker进程,确保异常退出后自动重启
并发数设置: 根据CPU核心数和任务类型设置--concurrency参数,CPU密集型任务建议不超过核心数
任务超时: 为每个任务设置合理的软超时(soft_time_limit)和硬超时(time_limit)
优先级管理: 使用多队列+优先级策略,确保关键任务优先执行
日志收集: 将Worker日志收集到集中式日志系统(如ELK),便于问题排查
优雅关闭: Worker关闭时设置--timeout,确保正在执行的任务有足够时间完成
推荐命令: celery -A tasks worker -Q high,default --loglevel=warning --concurrency=4 --max-tasks-per-child=1000 --time-limit=3600 --soft-time-limit=3000
九、核心要点总结
Celery核心要点:
Celery是Python生态中最成熟的分布式任务队列框架,架构由Broker、Worker、Backend三部分组成
选择合适的Broker至关重要:RabbitMQ适合生产环境,Redis适合开发和小型项目
使用多队列和任务路由可以实现资源隔离和优先级管理
Canvas原语(chain/group/chord)提供了强大的任务编排能力
Celery Beat支持crontab和interval两种定时调度方式
生产环境中必须配置任务重试、超时、过期等容错机制
Flower是推荐的监控工具,提供实时的Worker和任务状态信息
使用Supervisor管理Worker进程是生产环境的标配