Celery分布式任务队列入门

Python并发编程专题 · 生产级的异步任务处理框架

专题:Python并发编程系统学习

关键词:Python, 并发编程, Celery, 任务队列, Broker, worker, 分布式任务, 异步

一、Celery简介

什么是Celery

Celery是一个基于Python开发的分布式任务队列框架,专注于处理实时任务和定时调度任务。它采用生产者-消费者模式,将任务的创建与执行分离,允许将耗时的同步操作转化为异步任务,从而提升系统的吞吐量和响应速度。

适用场景

架构三要素

Celery的架构由三个核心组件组成,理解这三个组件是掌握Celery的基础。

核心思想:Celery将任务的定义分发执行三者解耦,使开发者可以专注于业务逻辑,而无需关心底层的并发和通信细节。

二、快速开始

安装Celery

使用pip安装Celery库,根据选择的Broker安装对应的依赖。

# 安装Celery核心库 pip install celery # 使用Redis作为Broker时 pip install celery[redis] # 使用RabbitMQ作为Broker时 pip install celery[rabbitmq]

定义第一个任务

创建一个名为tasks.py的文件,定义Celery应用和第一个任务。

from celery import Celery # 创建Celery应用实例 # broker参数指定消息代理的地址 app = Celery('tasks', broker='redis://localhost:6379/0') @app.task def add(x, y): return x + y

启动Worker并调用任务

在终端中启动Worker进程,然后在Python中调用任务。

# 终端启动Worker(需在tasks.py所在目录执行) celery -A tasks worker --loglevel=info # 在Python交互环境中调用任务 from tasks import add result = add.delay(4, 5) print(result.id) # 输出任务ID,用于后续查询结果

注意:delay()方法是apply_async()的简写形式,两者效果相同。delay()不支持传递执行参数,而apply_async()可以指定countdown、eta、queue等高级参数。

三、Broker配置

RabbitMQ vs Redis对比

Broker是Celery的核心依赖,选择合适的Broker对生产环境至关重要。

特性 RabbitMQ Redis
消息可靠性 高(支持消息确认、持久化) 中(可能存在数据丢失风险)
性能 高,适合大规模消息 高,适合轻量级场景
功能丰富度 丰富(交换机、路由、死信队列) 基础(列表实现队列)
运维复杂度 较高(需管理Erlang环境) 低(Redis通用性高)
推荐场景 生产环境、对可靠性要求高的场景 开发测试、小型项目

连接池配置

生产环境中需要合理配置连接池参数,避免连接耗尽或资源浪费。

# 使用Redis作为Broker时配置连接池 app.conf.broker_url = 'redis://:password@localhost:6379/0' app.conf.broker_pool_limit = 10 # 连接池最大连接数 app.conf.broker_connection_retry_on_startup = True app.conf.broker_connection_max_retries = 5

消息持久化

确保Broker重启后任务消息不丢失,需要开启消息持久化功能。

# RabbitMQ中使用持久化队列 app.conf.task_default_delivery_mode = 2 # 2=persistent, 1=non-persistent # Redis中默认不持久化,可通过配置开启 app.conf.broker_transport_options = { 'visibility_timeout': 3600, # 超时时间(秒) 'max_retries': 3, }

四、调用任务与结果获取

调用方式

Celery提供了多种调用任务的方式,满足不同的业务场景。

# delay方式(简写) result = add.delay(4, 5) # apply_async方式(完整参数) result = add.apply_async( args=(4, 5), countdown=10, # 延迟10秒执行 expires=3600, # 任务过期时间(秒) queue='high_priority', # 指定队列 task_id='custom-id' # 自定义任务ID )

结果获取(Result Backend)

配置result backend后,可以通过AsyncResult获取任务的执行结果。

from celery.result import AsyncResult # 配置result backend app.conf.result_backend = 'redis://localhost:6379/1' # 定义任务 @app.task def long_task(n): import time time.sleep(n) return f"任务执行完毕,耗时{n}秒" # 调用并获取结果 result = long_task.delay(5) task_id = result.id # 在其他地方查询结果 async_result = AsyncResult(task_id, app=app) if async_result.ready(): print(async_result.get()) # 获取结果(阻塞) print(async_result.status) # 任务状态: SUCCESS/FAILURE/PENDING print(async_result.successful()) # 是否成功

最佳实践:get()方法是阻塞调用,用于需要等待结果的场景。在Web应用中应避免直接调用get(),而是采用异步轮询或WebSocket推送方式获取结果。

任务状态跟踪

Celery任务的生命周期包含多个状态,了解这些状态有助于调试和监控。

五、任务路由与队列

多队列分离不同任务

在生产环境中,通常需要将不同类型的任务分发到不同的队列,以便进行资源隔离和优先级管理。

# 配置任务路由 app.conf.task_routes = { 'tasks.send_email': {'queue': 'email'}, 'tasks.process_image': {'queue': 'image'}, 'tasks.generate_report': {'queue': 'report'}, } # 也可以通过装饰器直接指定 @app.task(queue='email') def send_welcome_email(user_email): pass

启动Worker时绑定指定队列

不同的Worker进程可以监听不同的队列,实现资源隔离。

# 启动只处理email队列的Worker celery -A tasks worker -Q email --loglevel=info # 启动处理image和report队列的Worker celery -A tasks worker -Q image,report --loglevel=info # Worker默认监听名为"celery"的队列 celery -A tasks worker --loglevel=info

任务优先级

通过设置优先级,确保重要的任务得到优先处理。

# 定义时设置优先级(数值越小优先级越高) result = add.apply_async( args=(4, 5), priority=1 # 高优先级 ) # 低优先级任务 result = add.apply_async( args=(1, 2), priority=10 # 低优先级 )

设计原则:建议将不同类型、不同重要性的任务路由到不同的队列,并为每个队列启动独立的Worker进程。这样即使某个队列的任务大量积压,也不会影响其他队列的正常处理。

六、定时任务Celery Beat

什么是Celery Beat

Celery Beat是一个定时任务调度器,它周期性地将任务发送到Broker,由Worker执行。Beat和Worker可以运行在同一个进程中,也可以分离运行。

Crontab调度

使用crontab表达式可以精确控制任务的执行时间,语法与Linux crontab一致。

from celery.schedules import crontab app.conf.beat_schedule = { 'daily-report': { 'task': 'tasks.generate_daily_report', 'schedule': crontab(hour=8, minute=0), # 每天8:00执行 'args': ('daily',), }, 'weekly-cleanup': { 'task': 'tasks.cleanup_temp_files', 'schedule': crontab( hour=3, minute=0, day_of_week='monday' # 每周一3:00执行 ), }, }

Interval定时

对于需要固定间隔执行的任务,使用interval调度更加直观。

from celery.schedules import timedelta app.conf.beat_schedule = { 'health-check': { 'task': 'tasks.health_check', 'schedule': timedelta(seconds=30), # 每30秒执行一次 }, 'sync-data': { 'task': 'tasks.sync_data', 'schedule': timedelta(minutes=15), # 每15分钟执行一次 }, }

启动Beat

启动Beat调度器需要单独运行beat进程。

# 启动Beat调度器 celery -A tasks beat --loglevel=info # 同时启动Worker和Beat(开发环境常用) celery -A tasks worker --beat --loglevel=info

动态添加定时任务

在运行时通过数据库动态管理定时任务,适合需要频繁调整调度策略的场景。

# 使用django-celery-beat扩展实现动态管理 from django_celery_beat.models import PeriodicTask, CrontabSchedule # 创建crontab调度 schedule, _ = CrontabSchedule.objects.get_or_create( hour=9, minute=0, ) # 创建定时任务 PeriodicTask.objects.create( name='send-morning-email', task='tasks.send_morning_email', crontab=schedule, args='[]', )

七、工作流(Canvas原语)

什么是Canvas原语

Celery提供了强大的工作流原语,用于将多个任务组合成复杂的执行流程。这是Celery区别于其他任务队列的核心特性之一。

chain链式调用

将多个任务串联执行,前一个任务的输出作为后一个任务的输入。

from celery import chain # 定义任务 @app.task def process_order(order_id): return {'order_id': order_id, 'status': 'processed'} @app.task def send_notification(order_data): print(f"通知: 订单{order_data['order_id']}已处理") return order_data # 链式执行:先处理订单,再发送通知 result = chain(process_order.s(1001), send_notification.s())() # 等价于:result = (process_order.s(1001) | send_notification.s())()

group并行调用

将多个任务并行执行,所有任务完成后返回结果列表。

from celery import group @app.task def fetch_page(url): import requests return {'url': url, 'length': len(requests.get(url).text)} # 并行抓取多个页面 urls = ['http://example.com', 'http://python.org', 'http://celeryproject.org'] result = group(fetch_page.s(url) for url in urls)() print(result.get()) # 等待所有任务完成后获取结果列表

chord回调

chord是group的增强版,在所有并行任务完成后执行一个回调任务。

from celery import chord @app.task def sum_results(results): return sum(results) # 先并行计算,再汇总结果 result = chord( [add.s(i, i) for i in range(10)], # 并行计算0+0, 1+1, ..., 9+9 body=sum_results.s() # 计算完成后汇总 )() print(result.get()) # 输出90

signature签名

Signature是任务的"签名",将任务及其参数封装为可传递的对象,是构建工作流的基础。

# 使用.s()方法创建签名 sig1 = add.s(2, 3) # 部分参数 sig2 = add.s(2) # 只有一个参数,后续通过pipe传入 sig3 = add.s(2, 3, 4) # 错误的用法,add只接受两个参数 # 使用signature类 from celery import signature task_sig = signature('tasks.add', args=(2, 3), countdown=10) result = task_sig.delay()

工作流选择指南:需要对多个结果进行聚合时用group+chord;任务有明确的先后依赖关系时用chain;需要动态组合任务时使用signature。合理的任务编排可以大幅提升系统的处理效率。

八、监控与管理

Flower监控面板

Flower是基于Web的Celery监控工具,提供实时的Worker状态、任务历史、队列深度等信息。

# 安装Flower pip install flower # 启动Flower监控 celery -A tasks flower --port=5555 # 启用认证 celery -A tasks flower --port=5555 \ --basic_auth=admin:password123

启动后访问 http://localhost:5555 即可查看监控面板,包括任务成功率、执行时间分布、Worker负载等关键指标。

Worker状态查看

通过Celery命令行工具可以实时查看Worker的状态和统计信息。

# 查看所有活跃的Worker celery -A tasks status # 查看Worker的详细信息 celery -A tasks inspect active celery -A tasks inspect reserved celery -A tasks inspect registered # 查看所有队列中的消息数量 celery -A tasks inspect active_queues

任务重试与过期

生产环境中任务可能因各种原因失败,合理的重试策略和任务过期机制至关重要。

# 定义自动重试的任务 @app.task( bind=True, max_retries=3, default_retry_delay=60, # 首次重试间隔(秒) autoretry_for=(Exception,), # 遇到异常自动重试 ) def unreliable_task(self): import random if random.random() < 0.7: raise ConnectionError("网络连接失败") return "成功" # 手动重试(指数退避) @app.task(bind=True, max_retries=5) def api_call(self, url): try: return call_external_api(url) except Exception as e: countdown = 2 ** self.request.retries # 指数退避:2, 4, 8, 16, 32秒 raise self.retry(exc=e, countdown=countdown)

生产环境部署最佳实践

推荐命令:celery -A tasks worker -Q high,default --loglevel=warning --concurrency=4 --max-tasks-per-child=1000 --time-limit=3600 --soft-time-limit=3000

九、核心要点总结

Celery核心要点:

  • Celery是Python生态中最成熟的分布式任务队列框架,架构由Broker、Worker、Backend三部分组成
  • 选择合适的Broker至关重要:RabbitMQ适合生产环境,Redis适合开发和小型项目
  • 使用多队列和任务路由可以实现资源隔离和优先级管理
  • Canvas原语(chain/group/chord)提供了强大的任务编排能力
  • Celery Beat支持crontab和interval两种定时调度方式
  • 生产环境中必须配置任务重试、超时、过期等容错机制
  • Flower是推荐的监控工具,提供实时的Worker和任务状态信息
  • 使用Supervisor管理Worker进程是生产环境的标配