一、消息队列的核心作用
消息队列(Message Queue, MQ)是分布式系统和并发编程中的关键组件,其核心价值体现在以下五个方面:
解耦(Decoupling):生产者和消费者之间通过消息队列间接通信,双方无需感知对方的存在。生产者只管向队列发送消息,消费者只管从队列取消息,彼此独立演进。当新增或下线一个消费者时,生产者和已有消费者完全不受影响。
异步处理(Asynchronous Processing):将耗时的同步操作转化为异步消息,显著降低请求响应时间。例如用户注册场景,将发送邮件、短信通知等操作放入消息队列异步执行,核心注册逻辑可在毫秒级完成。
削峰填谷(Peak Shaving):应对突发流量时,消息队列充当缓冲层。瞬间涌入的请求先进入队列,后端消费者按自身处理能力慢慢消费,避免系统被流量尖峰冲垮。这在秒杀、抢购等场景中尤为关键。
可靠通信(Reliable Communication):消息队列提供持久化、确认机制、重试等保障,确保消息不丢失、不重复(或至少一次)。即使消费者短暂宕机,消息也会在队列中等待,恢复后继续处理。
最终一致性(Eventual Consistency):在微服务和分布式系统中,消息队列是实现最终一致性的重要工具。通过可靠事件投递和补偿机制,多个服务间的数据状态最终达到一致,避免了分布式事务的复杂性和性能开销。
选择消息队列本质上是在一致性、可用性、吞吐量和运维复杂度之间做权衡。没有银弹,只有最适合当前业务场景的方案。
二、RabbitMQ详解
2.1 AMQP协议与核心概念
RabbitMQ是AMQP(Advanced Message Queuing Protocol)协议的标准实现,是一个成熟、稳定的消息代理(Message Broker)。其核心模型包含以下几个组件:
- Producer(生产者):发送消息的应用程序
- Consumer(消费者):接收消息的应用程序
- Exchange(交换机):消息路由器,接收生产者消息并根据路由规则分发到队列
- Queue(队列):存储消息的缓冲区,消费者从队列拉取消息
- Binding(绑定):定义Exchange和Queue之间的路由关系
2.2 Exchange路由类型
RabbitMQ最强大的特性是其灵活的路由能力,主要通过四种Exchange类型实现:
Direct Exchange(直连交换机):消息的路由键(Routing Key)与队列绑定的路由键完全匹配时,消息被路由到该队列。适用于点对点通信,如将日志按级别分发到不同队列。
Fanout Exchange(扇形交换机):忽略路由键,将消息广播到所有绑定到该交换机的队列。适用于发布-订阅场景,如广播配置更新通知到所有服务实例。
Topic Exchange(主题交换机):路由键支持通配符匹配(*匹配一个单词,#匹配零个或多个单词)。支持灵活的多条件路由,如按地域+业务类型路由消息到不同处理中心。
Headers Exchange(头交换机):不依赖路由键,而是根据消息头的键值对进行匹配。适用于需要多维度路由规则的复杂场景。
2.3 消息确认机制
RabbitMQ提供了完整的消息可靠性保障:
- Publisher Confirm:生产者发送消息后,Broker返回confirm确认消息已到达交换机/队列
- Consumer Ack:消费者处理完消息后手动发送ack,通知Broker可以删除消息;若消费者异常断开而未ack,消息重新入队
- 消息持久化:将Exchange、Queue和消息都标记为durable,确保重启后消息不丢失
- 死信队列(DLQ):处理失败的消息可转入死信队列,方便后续排查和重试
2.4 Python客户端:pika
pika是RabbitMQ官方推荐的Python客户端,提供了一个干净、异步兼容的API。下面是一个简单的生产者-消费者示例。
import pika
import json
# 生产者:发送消息到RabbitMQ
def send_message():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# 声明队列(幂等操作,队列已存在则忽略)
channel.queue_declare(queue='task_queue', durable=True)
message = json.dumps({
'task_id': 123,
'payload': 'process_order',
'timestamp': '2026-05-06T10:00:00'
})
# 发送消息,标记为持久化(delivery_mode=2)
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
)
)
print(f" [x] Sent {message}")
connection.close()
# 消费者:消费消息并手动确认
def consume_messages():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
# 每次只取一条消息,处理完再取下一条(公平分发)
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
data = json.loads(body)
print(f" [x] Received task: {data['task_id']}")
# 模拟任务处理
import time
time.sleep(1)
# 手动确认消息处理完成
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f" [x] Done task: {data['task_id']}")
channel.basic_consume(
queue='task_queue',
on_message_callback=callback
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
三、Redis消息队列
Redis作为内存数据库,也提供了多种消息队列的实现方式。根据需求复杂度的不同,可选择不同的数据结构和方案。
3.1 List实现简单队列(LPUSH/BRPOP)
基于Redis List数据结构实现最简单的消息队列:生产者使用LPUSH向列表左侧推入消息,消费者使用BRPOP从右侧阻塞弹出消息。
- 优点:实现极度简单,性能极高,支持阻塞获取
- 缺点:没有消息确认机制,消费者崩溃可能导致消息丢失;没有消费者组的支持
- 适用场景:简单的任务队列、日志收集、对可靠性要求不高的场景
3.2 Pub/Sub(发布/订阅)
Redis Pub/Sub实现了经典的发布-订阅模式:生产者向频道(Channel)发布消息,所有订阅该频道的消费者实时收到消息。
- 优点:一对多广播,实时性强,延迟极低(微秒级)
- 缺点:消息不持久化,消费者离线则丢失消息;没有消息堆积能力(消费者跟不上时会丢弃消息)
- 适用场景:实时通知、聊天室、在线状态广播、缓存失效通知
3.3 Stream(Redis 5.0+)
Redis Stream是Redis 5.0引入的完整消息队列解决方案,弥补了之前方案的不足:
- 持久化:消息以追加写方式存储在内存(可配置RDB/AOF持久化到磁盘)
- 消费者组(Consumer Group):支持多个消费者组成一个组,消息在组内负载均衡消费,类似Kafka的消费者组
- 消息确认(ACK):消费者处理完消息后需发送XACK确认,未确认的消息可被重新消费
- 消息回溯:支持通过消息ID范围查询历史消息,适合故障排查和数据重放
- 阻塞读取:XREAD和XREADGROUP支持阻塞方式获取新消息
Redis Stream可以说是Redis在消息队列领域的一次重要进化,它将Redis从简单的缓存/队列角色提升到了具备企业级消息能力的中间件。对于中小规模的业务系统,Stream往往是一个"够用且轻量"的选择。
3.4 Python客户端:redis-py操作Stream
import redis
import json
import time
import threading
# 生产者:向Stream添加消息
def stream_producer():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
stream_key = 'order_stream'
# 创建消费者组(若不存在)
try:
r.xgroup_create(stream_key, 'order_group', id='0', mkstream=True)
except redis.exceptions.ResponseError:
pass # 组已存在
for i in range(10):
message = {
'order_id': f'ORD-{i:05d}',
'amount': 100 + i * 10,
'timestamp': time.time()
}
# XADD:向Stream追加消息
msg_id = r.xadd(stream_key, message, maxlen=10000)
print(f"Produced: {msg_id} -> {message}")
time.sleep(0.5)
# 消费者:从消费者组读取消息
def stream_consumer(consumer_id):
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
stream_key = 'order_stream'
group_name = 'order_group'
while True:
try:
# XREADGROUP:从消费者组阻塞读取消息
results = r.xreadgroup(
group_name, consumer_id,
{stream_key: '>'},
count=1, block=2000
)
if results:
for stream_name, messages in results:
for msg_id, msg_data in messages:
print(f"Consumer {consumer_id} got: "
f"{msg_data['order_id']}")
# 处理完成后确认消息
r.xack(stream_key, group_name, msg_id)
else:
time.sleep(0.1)
except Exception as e:
print(f"Error: {e}")
time.sleep(1)
# 启动生产者
t = threading.Thread(target=stream_producer, daemon=True)
t.start()
# 启动两个消费者
for cid in ['consumer-1', 'consumer-2']:
threading.Thread(target=stream_consumer, args=(cid,), daemon=True).start()
time.sleep(15)
四、Kafka详解
4.1 分布式日志架构
Apache Kafka最初由LinkedIn开发,其设计哲学与传统消息队列截然不同。Kafka本质上是一个分布式提交日志(Distributed Commit Log),消息被顺序追加到日志文件中,消费者通过偏移量(Offset)在日志中移动,自主控制消费进度。
核心设计理念:
- 持久化日志:所有消息写入磁盘,利用操作系统页缓存和顺序I/O实现接近内存的速度
- 水平扩展:通过Partition分区实现并行处理,分区可在多台Broker间分布
- 高吞吐:批量发送、批量压缩、零拷贝传输,单机可达每秒百万级消息
4.2 核心概念
- Topic(主题):消息的逻辑分类,类似Kafka中的表
- Partition(分区):Topic的物理分片,每个分区是一个有序的、不可变的消息序列
- Offset(偏移量):分区内每条消息的唯一序号,消费者通过偏移量定位消息
- Producer(生产者):向Topic发送消息的客户端
- Consumer(消费者):从Topic拉取消息的客户端
- Consumer Group(消费者组):一组消费者共同消费一个Topic,每个分区只能被组内一个消费者消费,实现负载均衡
- Broker(代理节点):Kafka服务器节点,负责存储和转发消息
- ISR(In-Sync Replicas):与Leader保持同步的副本集合,保证数据可靠性
4.3 高吞吐的秘密
Kafka实现高吞吐的几项关键技术:
- 顺序写入:消息追加到分区日志尾部,利用磁盘顺序I/O(比随机I/O快数个数量级)
- 零拷贝(Zero-Copy):消费者读取消息时使用sendfile系统调用,数据直接从磁盘文件缓存拷贝到网卡,绕过用户空间,减少CPU上下文切换
- 批量处理:生产者批量发送、Broker批量存储、消费者批量拉取,减少网络往返次数
- 压缩:支持gzip/snappy/lz4/zstd等压缩算法,批量压缩效率极高
- 页缓存利用:消息写入后先进入操作系统页缓存,Consumer可直接从页缓存读取,无需等待刷盘
4.4 Python客户端:kafka-python与confluent-kafka
Python生态中有两个主流的Kafka客户端:
- kafka-python:纯Python实现,API友好,适合开发和测试环境
- confluent-kafka:基于librdkafka的C扩展,性能极高,适合生产环境
# confluent-kafka 生产者示例
from confluent_kafka import Producer, Consumer
import json
# ---------- 生产者 ----------
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'acks': 'all', # 所有副本都确认才返回
'compression.type': 'snappy', # 启用压缩
'batch.size': 16384, # 批次大小(字节)
'linger.ms': 10, # 等待更多消息组批的时间
}
producer = Producer(producer_conf)
topic = 'order_events'
for i in range(100):
data = {'order_id': i, 'status': 'created', 'user_id': f'user_{i % 10}'}
producer.produce(
topic,
key=str(i % 10), # 相同key的消息进入同一分区,保证顺序
value=json.dumps(data),
callback=delivery_report
)
# 轮询回调,触发消息发送和确认
producer.poll(0)
# 等待所有消息发送完成
producer.flush()
# ---------- 消费者 ----------
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order_processor',
'auto.offset.reset': 'earliest', # 从最早的消息开始消费
'enable.auto.commit': True, # 自动提交偏移量
'auto.commit.interval.ms': 5000,
}
consumer = Consumer(consumer_conf)
consumer.subscribe(['order_events'])
try:
while True:
msg = consumer.poll(timeout=1.0) # 轮询等待消息
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
order = json.loads(msg.value())
print(f"Received order {order['order_id']} "
f"from partition {msg.partition()} "
f"at offset {msg.offset()}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
五、核心维度对比表
以下从十余个核心维度对三大消息队列进行系统对比,帮助理解各自的优劣势。
| 对比维度 |
RabbitMQ |
Redis (Stream) |
Kafka |
| 消息模型 |
Exchange + Queue,灵活路由 |
List / Pub/Sub / Stream多种 |
Topic + Partition,日志模型 |
| 吞吐量 |
万级/秒(中等) |
十万级/秒(较高) |
百万级/秒(极高) |
| 延迟 |
微秒~毫秒级 |
微秒级(极低) |
毫秒级(批量会略高) |
| 持久化 |
可配置,队列+消息级别 |
RDB/AOF,Stream持久化 |
强制持久化到磁盘 |
| 消息路由 |
极强(4种Exchange+绑定) |
弱(需应用层实现) |
弱(仅Topic级别) |
| 顺序保证 |
单队列内有序 |
单Stream内有序 |
单Partition内有序 |
| 消息回溯 |
不支持(消费后删除) |
支持(根据ID范围查询) |
强(根据Offset/时间戳回溯) |
| 消费者组 |
竞争消费模式 |
Stream消费者组 |
原生Consumer Group |
| 消息确认 |
Consumer Ack + Publisher Confirm |
Stream XACK机制 |
自动/手动提交Offset |
| 消息堆积 |
有限(内存+磁盘限制) |
受内存限制大 |
极强(磁盘顺序读写) |
| 运维复杂度 |
中等(Erlang VM) |
低(Redis原生态) |
高(ZooKeeper/KRaft集群) |
| 客户端成熟度 |
所有主流语言 |
redis-py/lettuce等 |
confluent-kafka/kafka-python |
| 社区生态 |
成熟稳定,插件丰富 |
活跃(Redis生态) |
极为活跃(Confluent生态) |
| 典型应用 |
任务调度、微服务通信 |
实时通知、简单任务队列 |
日志采集、流处理、事件溯源 |
一句话总结:RabbitMQ胜在路由灵活,Redis胜在轻量低延迟,Kafka胜在高吞吐和持久化。没有绝对的好坏,只有是否适合你的场景。
六、选型决策树
面对实际业务需求,可以按照以下决策路径选择最合适的消息队列:
6.1 按业务场景选型
场景一:简单任务队列
如果你的需求是"一个生产者,一个消费者,需要解耦和异步处理"(如发送邮件、生成报表),优先级为:Redis List/Stream > RabbitMQ > Kafka。Redis实现最简单,部署成本最低,对于中小规模任务完全够用。
场景二:复杂路由需求
如果消息需要根据多种条件路由到不同的队列(如根据订单类型+地域+优先级分发到不同的处理中心),RabbitMQ是最佳选择。其四种Exchange类型提供了无与伦比的路由灵活性,而Kafka和Redis在此场景下需要在应用层实现复杂的路由逻辑。
场景三:大数据流处理
如果需要处理海量数据流(如用户行为日志、点击流、IoT传感器数据),Kafka是不二之选。其百万级吞吐、持久化到磁盘、与Flink/Spark等流处理框架的深度集成,使其成为数据管道的事实标准。
场景四:实时通知/广播
对于实时性要求极高且可以容忍少量消息丢失的场景(如在线状态通知、缓存失效广播),Redis Pub/Sub以极低的延迟和简单的API成为首选。
场景五:金融级可靠性
对于要求消息不丢不重、需要事务性保证的场景(如订单处理、支付回调),RabbitMQ的Publisher Confirm + Consumer Ack机制提供了最成熟可靠的消息保障。
6.2 按团队因素选型
- 运维能力:团队Kafka运维经验不足?优先选RabbitMQ或Redis,Kafka的集群运维复杂度较高
- 已有基础设施:团队已经在用Redis做缓存?再引入Redis Stream几乎零额外成本
- 技术栈一致性:Python/Go/Java混用的团队,RabbitMQ的多语言客户端支持最成熟
- 未来扩展性:预计流量会增长10倍?优先考虑Kafka的扩展能力,避免后续迁移
选型建议:不要为了Kafka而Kafka。如果你的消息量每秒不到一万条、没有复杂的流处理需求,RabbitMQ或Redis Stream通常更合适。Kafka的强悍能力在高吞吐场景下才能充分发挥,低吞吐场景下反而增加了不必要的复杂度。
七、Python客户端实战
下面通过三个完整的实战示例,展示如何在Python中使用这三大消息队列。
7.1 pika连接RabbitMQ(带连接池和重试)
import pika
import time
from functools import wraps
def retry_connect(max_retries=3, delay=2):
"""重试装饰器,处理RabbitMQ连接失败"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except pika.exceptions.AMQPConnectionError as e:
last_exception = e
print(f"Connection attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(delay)
raise last_exception
return wrapper
return decorator
class RabbitMQClient:
"""RabbitMQ客户端封装,支持连接管理和自动重连"""
def __init__(self, host='localhost', port=5672,
username='guest', password='guest'):
self.host = host
self.port = port
self.credentials = pika.PlainCredentials(username, password)
self.connection = None
self.channel = None
@retry_connect(max_retries=3)
def connect(self):
"""建立连接并创建通道"""
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
credentials=self.credentials,
heartbeat=600,
blocked_connection_timeout=300,
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
print("Connected to RabbitMQ successfully")
def declare_exchange(self, exchange_name, exchange_type='direct',
durable=True):
"""声明交换机"""
self.channel.exchange_declare(
exchange=exchange_name,
exchange_type=exchange_type,
durable=durable
)
def declare_queue(self, queue_name, durable=True):
"""声明队列"""
self.channel.queue_declare(queue=queue_name, durable=durable)
def bind_queue(self, queue_name, exchange_name, routing_key=''):
"""绑定队列到交换机"""
self.channel.queue_bind(
queue=queue_name,
exchange=exchange_name,
routing_key=routing_key
)
def publish(self, exchange, routing_key, message,
persistent=True):
"""发布消息"""
properties = pika.BasicProperties(
delivery_mode=2 if persistent else 1
)
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=message,
properties=properties
)
def consume(self, queue_name, callback, prefetch_count=1):
"""消费消息"""
self.channel.basic_qos(prefetch_count=prefetch_count)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=False
)
print(f" [*] Waiting for messages on {queue_name}")
self.channel.start_consuming()
def close(self):
"""关闭连接"""
if self.connection and self.connection.is_open:
self.connection.close()
# 使用示例
client = RabbitMQClient()
client.connect()
client.declare_exchange('order_exchange', 'direct')
client.declare_queue('order_queue')
client.bind_queue('order_queue', 'order_exchange', 'order.create')
client.publish('order_exchange', 'order.create',
'{"order_id": 1001, "action": "create"}')
client.close()
7.2 redis-py操作Stream(带消费者组管理)
import redis
import json
import time
import uuid
class RedisStreamMQ:
"""基于Redis Stream的消息队列封装"""
def __init__(self, host='localhost', port=6379, db=0):
self.client = redis.Redis(
host=host, port=port, db=db,
decode_responses=True
)
# 消息保留时间:7天(秒)
self.max_age = 7 * 24 * 3600
def create_group(self, stream_key, group_name):
"""创建消费者组"""
try:
self.client.xgroup_create(
stream_key, group_name,
id='0', mkstream=True
)
return True
except redis.exceptions.ResponseError as e:
if 'BUSYGROUP' in str(e):
return False
raise
def produce(self, stream_key, data):
"""发送消息到Stream"""
message_id = self.client.xadd(
stream_key,
data,
maxlen=100000 # 最多保留10万条消息
)
return message_id
def consume(self, stream_key, group_name, consumer_name,
batch_size=10, block=5000):
"""从消费者组消费消息"""
results = self.client.xreadgroup(
group_name, consumer_name,
{stream_key: '>'},
count=batch_size,
block=block
)
messages = []
if results:
for stream_name, entries in results:
for msg_id, msg_data in entries:
messages.append((msg_id, msg_data))
return messages
def ack(self, stream_key, group_name, msg_id):
"""确认消息已处理"""
self.client.xack(stream_key, group_name, msg_id)
def pending(self, stream_key, group_name, count=10):
"""查看未确认的消息"""
return self.client.xpending_range(
stream_key, group_name,
min='-', max='+', count=count
)
def claim(self, stream_key, group_name, consumer_name,
min_idle_time=60000):
"""转移超时的未确认消息到当前消费者"""
pending = self.pending(stream_key, group_name)
claimed = []
for msg in pending:
msg_id = msg['message_id']
idle_time = msg['times_delivered']
if idle_time > 0:
self.client.xclaim(
stream_key, group_name, consumer_name,
min_idle_time, [msg_id]
)
claimed.append(msg_id)
return claimed
def trim(self, stream_key, maxlen=10000):
"""裁剪Stream,保留最新的N条消息"""
return self.client.xtrim(stream_key, maxlen=maxlen)
def info(self, stream_key):
"""查看Stream信息"""
return self.client.xinfo_stream(stream_key)
# 使用示例
mq = RedisStreamMQ()
mq.create_group('notifications', 'notif_group')
# 发送消息
for i in range(5):
mq.produce('notifications', {
'user_id': f'U{i:04d}',
'title': f'Notification #{i}',
'content': f'This is notification {i}',
'timestamp': str(time.time())
})
# 消费消息
messages = mq.consume('notifications', 'notif_group',
'consumer-1', batch_size=5)
for msg_id, data in messages:
print(f"Consumed: {msg_id} -> {data['title']}")
mq.ack('notifications', 'notif_group', msg_id)
7.3 confluent-kafka生产消费(带错误处理和重试)
from confluent_kafka import Producer, Consumer, KafkaError
import json
import signal
import sys
class KafkaMQ:
"""Kafka消息队列客户端封装"""
def __init__(self, bootstrap_servers='localhost:9092'):
self.bootstrap_servers = bootstrap_servers
self.running = True
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, sig, frame):
print("Shutting down gracefully...")
self.running = False
def create_producer(self, **kwargs):
"""创建Kafka生产者"""
config = {
'bootstrap.servers': self.bootstrap_servers,
'acks': 'all',
'retries': 3,
'compression.type': 'snappy',
'batch.size': 32768,
'linger.ms': 5,
'enable.idempotence': True, # 幂等生产者,防止重复
}
config.update(kwargs)
return Producer(config)
def delivery_callback(self, err, msg):
"""消息发送回调"""
if err:
print(f"Failed to deliver to {msg.topic()}: {err}")
else:
print(f"Delivered to {msg.topic()} "
f"[partition={msg.partition()}, "
f"offset={msg.offset()}]")
def produce(self, producer, topic, key, value, callback=None):
"""发送单条消息"""
cb = callback or self.delivery_callback
producer.produce(
topic=topic,
key=key,
value=json.dumps(value).encode('utf-8'),
callback=cb
)
producer.poll(0)
def produce_batch(self, producer, topic, messages):
"""批量发送消息(相同key保证顺序)"""
for key, value in messages:
self.produce(producer, topic, key, value)
producer.flush()
def create_consumer(self, group_id, **kwargs):
"""创建Kafka消费者"""
config = {
'bootstrap.servers': self.bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000,
'max.poll.interval.ms': 300000,
}
config.update(kwargs)
return Consumer(config)
def consume_loop(self, consumer, topics, timeout=1.0):
"""消费循环,带优雅退出"""
consumer.subscribe(topics)
try:
while self.running:
msg = consumer.poll(timeout)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Consumer error: {msg.error()}")
continue
yield msg
# 每处理100条消息手动提交一次偏移量
if msg.offset() % 100 == 0:
consumer.commit(async=False)
finally:
consumer.close()
# 使用示例
kafka = KafkaMQ()
# 生产者
producer = kafka.create_producer()
order_events = [
('user_1', {'order_id': 1001, 'action': 'create'}),
('user_1', {'order_id': 1001, 'action': 'pay'}),
('user_2', {'order_id': 1002, 'action': 'create'}),
]
for key, value in order_events:
kafka.produce(producer, 'orders', key, value)
producer.flush()
# 消费者
consumer = kafka.create_consumer('order_service')
for msg in kafka.consume_loop(consumer, ['orders']):
data = json.loads(msg.value())
print(f"Processed: order {data['order_id']} "
f"action={data['action']} "
f"key={msg.key()}")
7.4 综合对比总结
RabbitMQ + pika:适合需要复杂路由、消息确认机制完善、运维成熟的场景。连接管理、重试机制、死信队列等特性完善,是传统企业应用的首选。
Redis Stream + redis-py:适合中小规模、希望减少基础设施复杂度、团队已在用Redis的场景。Stream的消息回溯和消费者组能力使其足以应对大部分业务需求,但存储容量受内存限制。
Kafka + confluent-kafka:适合大规模数据管道和高吞吐流处理场景。confluent-kafka基于librdkafka,性能远超纯Python的kafka-python,生产环境强烈推荐使用confluent-kafka。
在实际项目中,三种消息队列并非互斥关系,而是可以互补共存。常见的架构模式是:前端请求通过RabbitMQ进行业务逻辑分发,重要事件数据通过Kafka进入数据管道供分析和流处理使用,实时通知和缓存失效通过Redis Pub/Sub广播。理解各自的优势领域,才能在系统设计时做出最合理的选型。