一、概述
观察者模式(Observer Pattern)是行为型设计模式中最重要、应用最广泛的模式之一。它定义了对象之间的一对多依赖关系,当一个对象(称为"主题"Subject或"可观察对象"Observable)的状态发生变化时,所有依赖它的对象(称为"观察者"Observer)都会自动收到通知并更新。
在Python编程中,观察者模式几乎无处不在:GUI框架中的事件处理、Web框架中的信号机制、异步编程中的回调函数、消息队列中的发布-订阅模型,本质上都是观察者模式的不同变体。深入理解这一模式,对于构建松耦合、可扩展的系统至关重要。
核心思想
- 松耦合:主题不需要知道观察者的具体实现细节,只需知道它们实现了统一的接口
- 广播通信:一个状态变化可以同时通知多个订阅者
- 动态订阅:观察者可以在运行时随时注册或注销
- 事件驱动:系统响应外部状态变化而非轮询检查
┌─────────────────┐ ┌──────────────────┐
│ Subject │◄─────────│ Observer │
│ (接口/抽象类) │ 观察 │ (接口/抽象类) │
├─────────────────┤ ├──────────────────┤
│ +attach(o) │ │ +update(data) │
│ +detach(o) │ └────────┬─────────┘
│ +notify() │ │
└────────┬────────┘ │
│ implements │ implements
┌────────┴────────┐ ┌────────┴─────────┐
│ ConcreteSubject│ │ ConcreteObserver │
├─────────────────┤ ├──────────────────┤
│ -state │─────────>│ +update(data) │
│ +get_state() │ 通知 │ │
│ +set_state() │ └──────────────────┘
└─────────────────┘
上图展示了观察者模式的经典UML结构。主题(Subject)维护一个观察者列表,并提供注册(attach)和注销(detach)方法。当主题状态变化时,调用notify()遍历所有观察者并逐个调用其update()方法。
二、经典观察者模式:接口实现
我们先从最经典的接口版本开始。Python没有内置的接口关键字,但可以通过抽象基类(ABC)或鸭子类型来实现观察者模式。
2.1 使用ABC模块定义接口
from abc import ABC, abstractmethod
from typing import Any, List
class Observer(ABC):
"""观察者抽象接口"""
@abstractmethod
def update(self, data: Any) -> None:
...
class Subject(ABC):
"""主题抽象接口"""
@abstractmethod
def attach(self, observer: Observer) -> None:
...
@abstractmethod
def detach(self, observer: Observer) -> None:
...
@abstractmethod
def notify(self) -> None:
...
2.2 具体实现
class WeatherStation(Subject):
"""具体主题:气象站"""
def __init__(self) -> None:
self._observers: List[Observer] = []
self._temperature: float = 0.0
self._humidity: float = 0.0
def attach(self, observer: Observer) -> None:
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer: Observer) -> None:
self._observers.remove(observer)
def notify(self) -> None:
data = {
"temperature": self._temperature,
"humidity": self._humidity,
}
for observer in self._observers:
observer.update(data)
def set_measurements(self, temp: float, humidity: float) -> None:
self._temperature = temp
self._humidity = humidity
self.notify()
class DisplayScreen(Observer):
"""具体观察者:显示屏幕"""
def __init__(self, name: str) -> None:
self._name = name
def update(self, data: dict) -> None:
print(f"[{self._name}] 温度: {data['temperature']}°C, "
f"湿度: {data['humidity']}%")
class AlertSystem(Observer):
"""具体观察者:告警系统"""
def __init__(self, threshold: float = 35.0) -> None:
self._threshold = threshold
def update(self, data: dict) -> None:
if data["temperature"] > self._threshold:
print(f"⚠ 高温告警!当前温度: {data['temperature']}°C")
2.3 使用示例
station = WeatherStation()
screen1 = DisplayScreen("大厅屏幕")
screen2 = DisplayScreen("办公室屏幕")
alarm = AlertSystem(threshold=30.0)
station.attach(screen1)
station.attach(screen2)
station.attach(alarm)
station.set_measurements(28.5, 65.0)
# 输出:
# [大厅屏幕] 温度: 28.5°C, 湿度: 65.0%
# [办公室屏幕] 温度: 28.5°C, 湿度: 65.0%
station.set_measurements(35.2, 70.0)
# 输出:
# [大厅屏幕] 温度: 35.2°C, 湿度: 70.0%
# [办公室屏幕] 温度: 35.2°C, 湿度: 70.0%
# ⚠ 高温告警!当前温度: 35.2°C
设计分析
经典模式的优点在于接口清晰、类型安全,观察者和主题之间的契约通过抽象基类明确约定。但在Python实践中,这种"过度工程"的写法往往显得笨重。Python推崇的"鸭子类型"和"一等函数"特性,让我们可以用更轻量的方式实现同样的功能。
三、使用 weakref 避免内存泄漏
经典实现中最大的隐患是循环引用导致的内存泄漏。当主题持有观察者的强引用,而观察者又持有主题的引用时,GC无法回收这些对象。更隐蔽的情况是:观察者注册后忘记注销,导致对象永远无法被回收。
常见的内存泄漏场景
- GUI中注册了回调但窗口关闭时未注销
- 长生命周期的事件总线持有短生命周期对象的引用
- 缓存系统中旧观察者未被清理
- 单元测试中注册的模拟对象泄漏到生产环境
3.1 使用 weakref.ref
weakref.ref 创建一个对对象的弱引用,不会增加引用计数。当对象被销毁后,弱引用返回 None。
import weakref
from typing import Any, Set
class WeakObserver:
"""支持弱引用的观察者基类"""
def __call__(self, data: Any) -> None:
raise NotImplementedError
class WeakSubject:
"""使用弱引用管理观察者"""
def __init__(self) -> None:
# 存储弱引用,自动清理失效的引用
self._observers: Set[weakref.ref] = set()
def attach(self, observer: WeakObserver) -> None:
ref = weakref.ref(observer, self._auto_cleanup)
self._observers.add(ref)
def detach(self, observer: WeakObserver) -> None:
# 查找并移除与给定对象对应的弱引用
for ref in set(self._observers):
if ref() is observer:
self._observers.discard(ref)
break
def _auto_cleanup(self, ref: weakref.ref) -> None:
"""观察者被GC回收时的自动清理回调"""
self._observers.discard(ref)
print(f"[GC] 已自动清理失效观察者 ({len(self._observers)} 个活跃)")
def notify(self, data: Any) -> None:
dead_refs = []
for ref in self._observers:
observer = ref()
if observer is not None:
observer(data)
else:
dead_refs.append(ref)
# 清理本次遍历中发现但尚未清理的失效引用
for ref in dead_refs:
self._observers.discard(ref)
3.2 使用 weakref.WeakSet
weakref.WeakSet 是弱引用集合的封装,自动处理元素的添加和移除,API更简洁。
import weakref
from typing import Any
class EventBus:
"""使用 WeakSet 的事件总线"""
def __init__(self) -> None:
self._handlers = weakref.WeakSet()
def register(self, handler: Any) -> None:
self._handlers.add(handler)
def unregister(self, handler: Any) -> None:
self._handlers.discard(handler)
def emit(self, event: str, **kwargs: Any) -> None:
for handler in list(self._handlers):
handler(event, **kwargs)
class DataProcessor:
"""数据处理器,可能被随时销毁"""
def __init__(self, name: str) -> None:
self.name = name
def __call__(self, event: str, **kwargs) -> None:
print(f"[{self.name}] 收到事件 '{event}': {kwargs}")
def __del__(self):
print(f"[~] {self.name} 被销毁")
# 使用示例
bus = EventBus()
p1 = DataProcessor("处理器A")
p2 = DataProcessor("处理器B")
bus.register(p1)
bus.register(p2)
bus.emit("data_ready", count=42)
# 输出:
# [处理器A] 收到事件 'data_ready': {'count': 42}
# [处理器B] 收到事件 'data_ready': {'count': 42}
# 删除一个处理器
del p1
bus.emit("shutdown", reason="completed")
# 输出:
# [~] 处理器A 被销毁
# [处理器B] 收到事件 'shutdown': {'reason': 'completed'}
# 处理器A 不会再收到事件
weakref 最佳实践
- WeakSet 比手动管理 weakref.ref 更简洁,推荐在大多数场景使用
- 弱引用不能用于 lambda 和普通函数(因为它们没有 __weakref__ 属性),此时可用 WeakMethod 包装绑定方法
- 接收弱引用的对象必须支持弱引用——自定义类默认支持,内置类型如 list、dict 不支持
- 始终在事件通知循环中对失效引用做二次清理,防止回调查函数清理时出现竞态
3.3 weakref.WeakMethod 处理绑定方法
import weakref
from typing import Any, Set
class SafeEventEmitter:
"""安全的事件发射器,支持绑-定方法作为观察者"""
def __init__(self) -> None:
self._callbacks: Set[Any] = set()
def connect(self, callback: Any) -> None:
"""连接回调,自动处理绑定方法"""
if hasattr(callback, '__self__'):
# 绑定方法 → 使用 WeakMethod
ref = weakref.WeakMethod(callback)
else:
# 普通函数或可调用对象 → 使用 ref
ref = weakref.ref(callback, self._cleanup)
self._callbacks.add(ref)
def disconnect(self, callback: Any) -> None:
for ref in set(self._callbacks):
cb = ref()
if cb is callback or cb is None:
self._callbacks.discard(ref)
def _cleanup(self, ref: Any) -> None:
self._callbacks.discard(ref)
def emit(self, *args: Any, **kwargs: Any) -> None:
for ref in set(self._callbacks):
cb = ref()
if cb is not None:
cb(*args, **kwargs)
class Service:
def __init__(self, name: str):
self.name = name
def on_event(self, msg: str) -> None:
print(f"[{self.name}] {msg}")
emitter = SafeEventEmitter()
svc = Service("worker")
emitter.connect(svc.on_event) # 绑定方法自动用 WeakMethod 包装
emitter.emit("hello") # [worker] hello
del svc # 对象销毁后,弱引用自动失效
emitter.emit("world") # 无输出,不会崩溃
四、事件系统设计
将观察者模式封装为通用的事件系统,是实际项目中最常见的做法。我们来实现一个功能完备的 EventEmitter。
4.1 通用 EventEmitter
import weakref
from collections import defaultdict
from typing import Any, Callable, Dict, List, Set
class EventEmitter:
"""通用事件发射器"""
def __init__(self) -> None:
# event_name -> set of weak references to callbacks
self._events: Dict[str, Set[Any]] = defaultdict(set)
def on(self, event: str, callback: Callable) -> None:
"""注册事件监听器"""
if hasattr(callback, '__self__'):
ref = weakref.WeakMethod(callback)
else:
ref = weakref.ref(callback, lambda r: self._cleanup(event, r))
self._events[event].add(ref)
def off(self, event: str, callback: Callable) -> None:
"""移除事件监听器"""
for ref in set(self._events.get(event, [])):
cb = ref()
if cb is callback or cb is None:
self._events[event].discard(ref)
def once(self, event: str, callback: Callable) -> None:
"""注册一次性监听器,触发后自动移除"""
def wrapper(*args: Any, **kwargs: Any) -> None:
self.off(event, wrapper)
callback(*args, **kwargs)
self.on(event, wrapper)
def emit(self, event: str, *args: Any, **kwargs: Any) -> None:
"""发射事件"""
for ref in set(self._events.get(event, [])):
cb = ref()
if cb is not None:
cb(*args, **kwargs)
def _cleanup(self, event: str, ref: Any) -> None:
self._events[event].discard(ref)
def listeners(self, event: str) -> int:
"""获取指定事件的监听器数量"""
return sum(1 for ref in self._events.get(event, []) if ref())
def remove_all_listeners(self, event: str = None) -> None:
"""移除所有监听器"""
if event:
self._events[event].clear()
else:
self._events.clear()
# 使用示例
emitter = EventEmitter()
def on_data(msg: str) -> None:
print(f"收到数据: {msg}")
emitter.on("data", on_data)
emitter.once("start", lambda: print("只执行一次"))
emitter.emit("data", "你好")
emitter.emit("start")
emitter.emit("start") # 不会执行,因已自动移除
emitter.emit("data", "世界")
4.2 类型安全事件
使用 dataclass 和 Protocol 让事件具有类型安全性,在IDE中获得更好的代码补全和类型检查。
from dataclasses import dataclass, field
from typing import Protocol, Any, Dict, List
# 定义事件数据类型
@dataclass
class OrderCreated:
order_id: str
user_id: str
amount: float
items: List[str] = field(default_factory=list)
@dataclass
class PaymentCompleted:
order_id: str
transaction_id: str
paid_at: str
@dataclass
class OrderShipped:
order_id: str
tracking_number: str
# 定义事件处理器协议
class EventHandler(Protocol):
def handle(self, event: Any) -> None:
...
class TypedEventBus:
"""类型安全的事件总线"""
def __init__(self) -> None:
self._handlers: Dict[type, List[EventHandler]] = {}
def subscribe(self, event_type: type, handler: EventHandler) -> None:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
def publish(self, event: Any) -> None:
event_type = type(event)
for handler in self._handlers.get(event_type, []):
handler.handle(event)
# 具体处理器
class EmailNotifier:
def handle(self, event: Any) -> None:
if isinstance(event, OrderCreated):
print(f"[邮件] 发送订单确认: {event.order_id}, "
f"金额 ¥{event.amount:.2f}")
elif isinstance(event, OrderShipped):
print(f"[邮件] 发送发货通知: {event.order_id}, "
f"运单号: {event.tracking_number}")
class InventoryManager:
def handle(self, event: Any) -> None:
if isinstance(event, PaymentCompleted):
print(f"[库存] 订单 {event.order_id} 已付款,扣减库存")
# 使用示例
bus = TypedEventBus()
bus.subscribe(OrderCreated, EmailNotifier())
bus.subscribe(OrderShipped, EmailNotifier())
bus.subscribe(PaymentCompleted, InventoryManager())
bus.publish(OrderCreated(
order_id="ORD-2026-0001",
user_id="U-10086",
amount=299.00,
items=["Python进阶教程", "设计模式实战"],
))
bus.publish(PaymentCompleted(
order_id="ORD-2026-0001",
transaction_id="TXN-9a8b7c",
paid_at="2026-05-05 22:50:40",
))
五、__call__ 可调用对象作为回调
Python中任何实现了 __call__ 方法的对象都是"可调用对象"。这使得我们可以将带有状态的对象当作函数来使用,这是实现观察者回调的一种优雅方式。
5.1 可调用类作为观察者
from typing import Any, Callable, Dict, List
class CallbackObserver:
"""可调用观察者基类"""
def __init__(self, name: str) -> None:
self.name = name
self.call_count = 0 # 维护状态
def __call__(self, data: Any) -> None:
self.call_count += 1
self._process(data)
def _process(self, data: Any) -> None:
raise NotImplementedError
def __repr__(self) -> str:
return f"<{self.name}: 已处理 {self.call_count} 次>"
class Logger(CallbackObserver):
"""日志记录器"""
def _process(self, data: Any) -> None:
print(f"[日志] {self.name}: {data}")
class MetricsCollector(CallbackObserver):
"""指标收集器"""
def __init__(self, name: str) -> None:
super().__init__(name)
self.values: List[float] = []
def _process(self, data: Any) -> None:
if isinstance(data, (int, float)):
self.values.append(data)
avg = sum(self.values) / len(self.values)
print(f"[指标] {self.name}: avg={avg:.2f}, count={len(self.values)}")
class Subject:
def __init__(self) -> None:
self._observers: List[Callable] = []
def attach(self, observer: Callable) -> None:
self._observers.append(observer)
def notify(self, data: Any) -> None:
for obs in self._observers:
obs(data)
# 使用
subject = Subject()
logger = Logger("数据日志")
metrics = MetricsCollector("响应时间")
subject.attach(logger)
subject.attach(metrics)
subject.notify(42)
subject.notify(58)
subject.notify(67)
print(logger) # <数据日志: 已处理 3 次>
print(metrics) # <响应时间: 已处理 3 次>
5.2 函数与闭包作为回调
对于无状态或轻量级的场景,直接使用函数或闭包更为简洁。
from typing import Any, Dict, List, Callable
class CallbackManager:
"""基于函数的回调管理器"""
def __init__(self) -> None:
self._callbacks: Dict[str, List[Callable]] = {}
def register(self, event: str, fn: Callable) -> None:
self._callbacks.setdefault(event, []).append(fn)
def unregister(self, event: str, fn: Callable) -> None:
self._callbacks[event].remove(fn)
def trigger(self, event: str, **data: Any) -> None:
for fn in self._callbacks.get(event, []):
fn(**data)
# 普通函数
def send_notification(user: str, message: str) -> None:
print(f"通知 {user}: {message}")
# 闭包(带状态的函数)
def make_rate_limiter(max_calls: int):
calls = 0
def wrapper(**data: Any) -> None:
nonlocal calls
calls += 1
if calls <= max_calls:
print(f"[限流] 放行 ({calls}/{max_calls}): {data}")
else:
print(f"[限流] 拒绝 ({calls}>{max_calls})")
return wrapper
cm = CallbackManager()
cm.register("user.login", send_notification)
cm.register("user.login", make_rate_limiter(3))
cm.trigger("user.login", user="Alice", message="欢迎回来")
cm.trigger("user.login", user="Bob", message="欢迎回来")
cm.trigger("user.login", user="Charlie", message="欢迎回来")
cm.trigger("user.login", user="David", message="欢迎回来")
选择指南
| 方案 |
适用场景 |
优点 |
缺点 |
| 接口类 |
大型项目、团队协作 |
类型安全、契约明确 |
样板代码多 |
| 可调用对象 |
需要维护状态的回调 |
兼具函数接口和对象状态 |
略显笨重 |
| 普通函数 |
简单回调场景 |
最简洁、零开销 |
无法维护状态 |
| 闭包 |
轻量级带状态场景 |
简洁、封装性好 |
调试困难、弱引用不兼容 |
| functools.partial |
固定部分参数 |
灵活组合 |
弱引用不兼容 |
六、观察者模式在主流框架中的应用
6.1 Django Signals
Django内置的信号(Signals)系统是观察者模式在Web框架中的典型应用。它允许解耦的应用组件在事件发生时相互通知。
# Django 示例 - 纯概念演示
from django.dispatch import Signal, receiver
from django.db.models.signals import post_save
# 1. 定义自定义信号
order_created = Signal()
# 2. 接收器(观察者)
@receiver(order_created)
def send_order_confirmation(sender, **kwargs):
order = kwargs['order']
print(f"发送订单确认邮件: {order.id}")
@receiver(order_created)
def update_inventory(sender, **kwargs):
order = kwargs['order']
print(f"更新库存: 订单 {order.id}")
# 3. 发送信号(主题通知)
order_created.send(sender=OrderModel, order=order_instance)
# Django信号底层实现的核心简化版
import weakref
from collections import defaultdict
class Signal:
def __init__(self):
self._receivers = defaultdict(set)
def connect(self, receiver, sender=None, weak=True):
key = (id(sender) if sender else None, id(receiver))
if weak:
ref = weakref.ref(receiver)
self._receivers[key] = ref
else:
self._receivers[key] = receiver
def send(self, sender=None, **kwargs):
responses = []
for (sender_id, _), receiver in list(self._receivers.items()):
cb = receiver() if isinstance(receiver, weakref.ref) else receiver
if cb is not None:
response = cb(sender=sender, **kwargs)
responses.append((cb, response))
return responses
6.2 PyQt5 / PySide6 Signals & Slots
Qt框架的信号与槽(Signals and Slots)机制是观察者模式在GUI编程中的经典实现,它解决了线程安全通信的问题。
# PyQt5 示例 - 纯概念演示
from PyQt5.QtCore import QObject, pyqtSignal
class DownloadManager(QObject):
"""下载管理器 - 主题"""
# 定义信号
progress = pyqtSignal(int) # 进度信号
finished = pyqtSignal(str) # 完成信号
error = pyqtSignal(Exception) # 错误信号
def __init__(self):
super().__init__()
def download(self, url: str):
# 模拟下载过程
for i in range(101):
self.progress.emit(i) # 发射进度信号
self.finished.emit(url) # 发射完成信号
class ProgressBar(QObject):
"""进度条 - 观察者"""
def __init__(self):
super().__init__()
@property
def value(self):
return self._value
@value.setter
def value(self, v):
self._value = v
print(f"进度条更新: {v}%")
class StatusBar(QObject):
"""状态栏 - 另一个观察者"""
def show_message(self, msg: str):
print(f"状态栏: {msg}")
manager = DownloadManager()
progress = ProgressBar()
status = StatusBar()
# 连接信号与槽
manager.progress.connect(progress.setter("value"))
manager.finished.connect(status.show_message)
manager.download("https://example.com/file.zip")
Qt信号的线程安全设计值得学习:跨线程的信号发射会自动通过事件循环排队,确保UI更新在主线程执行。这种设计模式可以推广到任何需要跨线程通信的Python应用。
七、发布-订阅模式 vs 经典观察者模式
虽然两者经常被混用,但它们之间存在关键区别:
核心区别
- 经典观察者模式:观察者直接订阅主题,两者彼此知道对方的存在。通信是直接的。
- 发布-订阅模式:引入一个"消息代理/事件通道"作为中间层,发布者和订阅者完全不知道彼此的存在。通信通过代理转发。
# 经典观察者模式
Subject ──→ Observer A
├──→ Observer B
└──→ Observer C
(Subject 直接持有观察者引用)
# 发布-订阅模式
Publisher → [Event Bus] → Subscriber A
→ Subscriber B
→ Subscriber C
(双方不感知对方)
7.1 发布-订阅模式实现
import json
import weakref
from typing import Any, Callable, Dict, List, Optional
class PubSubBroker:
"""消息代理 - 发布订阅模式的核心"""
def __init__(self) -> None:
self._subscribers: Dict[str, List[Any]] = {}
self._history: Dict[str, List[Any]] = {} # 消息历史(可选)
def subscribe(self, channel: str, callback: Callable) -> None:
if channel not in self._subscribers:
self._subscribers[channel] = []
if hasattr(callback, '__self__'):
ref = weakref.WeakMethod(callback)
else:
ref = weakref.ref(callback)
self._subscribers[channel].append(ref)
def unsubscribe(self, channel: str, callback: Callable) -> None:
for ref in self._subscribers.get(channel, []):
cb = ref()
if cb is callback or cb is None:
self._subscribers[channel].remove(ref)
def publish(self, channel: str, message: Any) -> None:
"""发布消息到频道"""
# 保存到历史
self._history.setdefault(channel, []).append(message)
# 通知所有订阅者
dead_refs = []
for ref in self._subscribers.get(channel, []):
cb = ref()
if cb is not None:
cb(channel, message)
else:
dead_refs.append(ref)
for ref in dead_refs:
self._subscribers[channel].remove(ref)
def get_history(self, channel: str, limit: int = 10) -> List[Any]:
return self._history.get(channel, [])[-limit:]
def subscriber_count(self, channel: str) -> int:
return len([r for r in self._subscribers.get(channel, []) if r()])
# 使用示例
broker = PubSubBroker()
# 不同的模块/服务发布和订阅完全独立
def analytics_service(channel: str, msg: Any) -> None:
print(f"[分析服务] 频道 '{channel}': {msg}")
def notification_service(channel: str, msg: Any) -> None:
print(f"[通知服务] 频道 '{channel}': {msg}")
def audit_logger(channel: str, msg: Any) -> None:
print(f"[审计日志] 频道 '{channel}': {msg}")
# 订阅
broker.subscribe("order.created", analytics_service)
broker.subscribe("order.created", notification_service)
broker.subscribe("payment.*", audit_logger)
broker.subscribe("user.registered", notification_service)
# 发布(发布者不知道谁在收听)
broker.publish("order.created", {"id": "ORD-001", "amount": 299})
broker.publish("payment.completed", {"txn": "TXN-001", "status": "success"})
# 发布者可以随时发布,即使没有订阅者
broker.publish("system.health", {"status": "ok"})
| 维度 |
经典观察者模式 |
发布-订阅模式 |
| 耦合度 |
中等(主题知道观察者) |
极低(完全解耦) |
| 通信方式 |
直接调用 |
通过消息代理中转 |
| 消息过滤 |
由主题控制 |
支持频道/主题匹配(如通配符) |
| 扩展性 |
适合小型系统 |
适合大型分布式系统 |
| 消息持久化 |
不支持 |
可支持消息历史/重放 |
| 典型应用 |
GUI事件、简单回调 |
消息队列(Redis Pub/Sub)、事件驱动架构 |
八、回调注册与撤销管理
在实际项目中,观察者的生命周期管理是一个不容忽视的问题。良好的取消注册机制和上下文管理器可以显著提升代码的健壮性。
8.1 上下文管理器自动注销
from contextlib import contextmanager
from typing import Any, Callable, Generator, List
class Observable:
"""支持上下文管理器自动注销的可观察对象"""
def __init__(self) -> None:
self._observers: List[Callable] = []
def attach(self, observer: Callable) -> None:
self._observers.append(observer)
def detach(self, observer: Callable) -> None:
self._observers.remove(observer)
@contextmanager
def observe(self, observer: Callable) -> Generator:
"""在 with 块内自动注册,退出时自动注销"""
self.attach(observer)
try:
yield
finally:
self.detach(observer)
def notify(self, data: Any) -> None:
for obs in self._observers:
obs(data)
# 使用 context manager 确保不会泄漏
source = Observable()
def debug_handler(data):
print(f"[debug] {data}")
# 方式1: 手动管理(容易忘记 detach)
source.attach(debug_handler)
# ... 使用 ...
source.detach(debug_handler)
# 方式2: 上下文管理器(推荐)
with source.observe(debug_handler):
source.notify("临时监控数据")
# 退出 with 块后自动注销
8.2 返回注销令牌(Token)模式
类似于 React.useEffect 的清理函数或RxJS的Subscription,返回一个"注销令牌"让调用方可以在任何时候取消订阅。
from typing import Any, Callable, Dict, Set
from uuid import uuid4
class TokenBasedEventBus:
"""基于令牌注册/注销的事件总线"""
def __init__(self) -> None:
self._handlers: Dict[str, Dict[str, Callable]] = {}
def on(self, event: str, handler: Callable) -> str:
"""注册处理器,返回注销令牌"""
token = uuid4().hex
self._handlers.setdefault(event, {})[token] = handler
return token
def off(self, token: str) -> bool:
"""通过令牌注销处理器"""
for event in list(self._handlers.keys()):
if token in self._handlers[event]:
del self._handlers[event][token]
if not self._handlers[event]:
del self._handlers[event]
return True
return False
def emit(self, event: str, **data: Any) -> None:
for handler in self._handlers.get(event, {}).values():
handler(**data)
def listener_count(self, event: str = None) -> int:
if event:
return len(self._handlers.get(event, {}))
return sum(len(v) for v in self._handlers.values())
def clear(self) -> None:
"""清空所有处理器"""
self._handlers.clear()
# 使用示例
bus = TokenBasedEventBus()
def on_user_login(**kw):
print(f"用户登录: {kw}")
# 注册获取令牌,后续用令牌注销
token = bus.on("user.login", on_user_login)
bus.emit("user.login", user_id="U001")
# 稍后不需要了,用令牌精确注销
bus.off(token)
bus.emit("user.login", user_id="U002") # 不再输出
8.3 优先级与排序
import bisect
from typing import Any, Callable, List, Tuple
class PriorityEventBus:
"""支持优先级的观察者管理器"""
def __init__(self) -> None:
# (priority, id, callback) 三元组,按优先级排序
self._handlers: List[Tuple[int, int, Callable]] = []
self._counter = 0
def register(self, callback: Callable, priority: int = 0) -> int:
"""
注册回调,priority越大越先执行
返回handler_id用于后续注销
"""
self._counter += 1
handler_id = self._counter
# 使用负优先级确保高优先级的排在前面
item = (-priority, handler_id, callback)
bisect.insort(self._handlers, item)
return handler_id
def unregister(self, handler_id: int) -> bool:
for i, (_, hid, _) in enumerate(self._handlers):
if hid == handler_id:
self._handlers.pop(i)
return True
return False
def emit(self, *args: Any, **kwargs: Any) -> None:
for _, _, cb in list(self._handlers):
cb(*args, **kwargs)
bus = PriorityEventBus()
bus.register(lambda: print("[1] 低优先级"), priority=1)
bus.register(lambda: print("[3] 高优先级"), priority=3)
bus.register(lambda: print("[2] 中优先级"), priority=2)
bus.emit()
# 输出:
# [3] 高优先级
# [2] 中优先级
# [1] 低优先级
九、异步事件通知(asyncio 版本)
在异步编程中,观察者模式的实现需要考虑事件循环的协作。我们来实现一个完整的异步版本。
9.1 异步观察者模式基础
import asyncio
from typing import Any, Callable, List
class AsyncSubject:
"""异步主题"""
def __init__(self) -> None:
self._observers: List[Callable] = []
def attach(self, observer: Callable) -> None:
self._observers.append(observer)
def detach(self, observer: Callable) -> None:
self._observers.remove(observer)
async def notify(self, data: Any) -> None:
# 并发执行所有异步观察者
tasks = [self._safe_call(obs, data) for obs in self._observers]
await asyncio.gather(*tasks)
async def _safe_call(self, observer: Callable, data: Any) -> None:
try:
result = observer(data)
if asyncio.iscoroutine(result):
await result
except Exception as e:
print(f"[错误] 观察者异常: {e}")
# 使用
async def logger(data):
await asyncio.sleep(0.1) # 模拟IO
print(f"[异步日志] {data}")
async def metrics(data):
await asyncio.sleep(0.05)
print(f"[异步指标] 收到数据长度: {len(str(data))}")
async def main():
subject = AsyncSubject()
subject.attach(logger)
subject.attach(metrics)
await subject.notify("异步观察者模式测试")
asyncio.run(main())
9.2 完整异步事件系统
import asyncio
import logging
from typing import Any, Callable, Dict, Set
from weakref import WeakMethod, ref
logger = logging.getLogger(__name__)
class AsyncEventEmitter:
"""完整的异步事件发射器"""
def __init__(self, loop: asyncio.AbstractEventLoop = None) -> None:
self._loop = loop or asyncio.get_event_loop()
self._handlers: Dict[str, Set[Any]] = {}
self._error_handlers: Set[Callable] = set()
def on(self, event: str, handler: Callable) -> None:
if event not in self._handlers:
self._handlers[event] = set()
if hasattr(handler, '__self__'):
ref_ = WeakMethod(handler)
else:
ref_ = ref(handler)
self._handlers[event].add(ref_)
def off(self, event: str, handler: Callable) -> None:
for ref_ in set(self._handlers.get(event, [])):
if ref_() is handler:
self._handlers[event].discard(ref_)
async def emit(self, event: str, *args: Any, **kwargs: Any) -> None:
"""异步发射事件,并发执行所有处理器"""
tasks = []
for ref_ in set(self._handlers.get(event, [])):
handler = ref_()
if handler is None:
self._handlers[event].discard(ref_)
continue
tasks.append(self._safe_dispatch(event, handler, *args, **kwargs))
if tasks:
await asyncio.gather(*tasks)
async def emit_sequential(self, event: str, *args: Any, **kwargs: Any) -> None:
"""按注册顺序依次执行"""
for ref_ in list(self._handlers.get(event, [])):
handler = ref_()
if handler is None:
self._handlers[event].discard(ref_)
continue
await self._safe_dispatch(event, handler, *args, **kwargs)
async def _safe_dispatch(self, event: str, handler: Callable,
*args, **kwargs) -> None:
try:
result = handler(*args, **kwargs)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.exception(f"事件 '{event}' 处理器异常: {e}")
def on_error(self, handler: Callable) -> None:
"""注册全局错误处理器"""
self._error_handlers.add(handler)
def remove_all_listeners(self, event: str = None) -> int:
"""移除事件监听器,返回移除数量"""
if event:
count = len(self._handlers.get(event, []))
self._handlers[event] = set()
else:
count = sum(len(v) for v in self._handlers.values())
self._handlers.clear()
return count
# 使用示例
async def example():
emitter = AsyncEventEmitter()
# 普通函数
def sync_handler(data):
print(f"[同步] {data}")
# 异步函数
async def async_handler(data):
await asyncio.sleep(0.1)
print(f"[异步] {data}")
emitter.on("data", sync_handler)
emitter.on("data", async_handler)
print("--- 并发执行 ---")
await emitter.emit("data", "hello")
print("--- 顺序执行 ---")
await emitter.emit_sequential("data", "world")
asyncio.run(example())
9.3 异步发布-订阅
import asyncio
from asyncio import Queue
from typing import Any, Dict, List
class AsyncPubSub:
"""基于 asyncio.Queue 的异步发布订阅"""
def __init__(self) -> None:
self._subscribers: Dict[str, List[Queue]] = {}
def subscribe(self, topic: str) -> Queue:
"""订阅主题,返回一个 Queue 用于接收消息"""
if topic not in self._subscribers:
self._subscribers[topic] = []
queue: Queue = Queue()
self._subscribers[topic].append(queue)
return queue
async def publish(self, topic: str, message: Any) -> None:
"""发布消息到主题"""
for queue in self._subscribers.get(topic, []):
await queue.put(message)
def unsubscribe(self, topic: str, queue: Queue) -> None:
"""取消订阅"""
if topic in self._subscribers:
self._subscribers[topic].remove(queue)
async def worker(name: str, topic: str, pubsub: AsyncPubSub):
"""工作协程:订阅主题并处理消息"""
queue = pubsub.subscribe(topic)
print(f"[{name}] 开始订阅 '{topic}'")
try:
while True:
message = await queue.get()
print(f"[{name}] 收到: {message}")
if message == "STOP":
break
except asyncio.CancelledError:
pass
finally:
pubsub.unsubscribe(topic, queue)
print(f"[{name}] 已取消订阅")
async def main():
pubsub = AsyncPubSub()
# 启动多个订阅者
task1 = asyncio.create_task(worker("订阅者A", "news", pubsub))
task2 = asyncio.create_task(worker("订阅者B", "news", pubsub))
await asyncio.sleep(0.1)
# 发布消息
await pubsub.publish("news", "第一条新闻")
await pubsub.publish("news", "第二条新闻")
await pubsub.publish("news", "STOP")
await asyncio.gather(task1, task2)
asyncio.run(main())
十、企业级实战:完整的事件驱动框架
将以上所有技术点综合起来,我们构建一个可用于生产环境的事件驱动框架。这个框架融合了弱引用管理、优先级排序、异步支持、错误处理和监控能力。
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Any, AsyncGenerator, Callable, Dict, List, Optional, Set
from weakref import WeakMethod, ref
logger = logging.getLogger(__name__)
@dataclass
class Event:
"""事件数据类"""
type: str
data: Any
timestamp: float = field(default_factory=time.time)
id: str = ""
class EventHandler:
"""事件处理器的包装器,支持优先级和统计"""
def __init__(self, callback: Callable, priority: int = 0,
name: str = "") -> None:
self.callback = callback
self.priority = priority
self.name = name or getattr(callback, '__name__', str(id(callback)))
self.invoked_count = 0
self.total_time = 0.0
async def __call__(self, event: Event) -> None:
start = time.time()
try:
result = self.callback(event)
if asyncio.iscoroutine(result):
await result
finally:
self.invoked_count += 1
self.total_time += time.time() - start
@property
def avg_time(self) -> float:
return self.total_time / max(self.invoked_count, 1)
class EnterpriseEventBus:
"""企业级事件总线"""
def __init__(self) -> None:
self._handlers: Dict[str, List[EventHandler]] = {}
self._middlewares: List[Callable] = []
self._metrics: Dict[str, int] = {}
def register(self, event_type: str, callback: Callable,
priority: int = 0, name: str = "") -> None:
if event_type not in self._handlers:
self._handlers[event_type] = []
handler = EventHandler(callback, priority, name)
self._handlers[event_type].append(handler)
self._handlers[event_type].sort(
key=lambda h: h.priority, reverse=True)
def unregister(self, event_type: str, callback: Callable) -> bool:
for handler in self._handlers.get(event_type, []):
if handler.callback is callback:
self._handlers[event_type].remove(handler)
return True
return False
def use(self, middleware: Callable) -> None:
"""注册中间件"""
self._middlewares.append(middleware)
async def dispatch(self, event: Event) -> None:
"""分发事件"""
self._metrics[event.type] = self._metrics.get(event.type, 0) + 1
# 执行中间件
for mw in self._middlewares:
await mw(event)
# 通知处理器
for handler in self._handlers.get(event.type, []):
await handler(event)
def get_stats(self, event_type: str = None) -> Dict:
"""获取事件统计信息"""
if event_type:
handlers = self._handlers.get(event_type, [])
return {
"event_count": self._metrics.get(event_type, 0),
"handler_count": len(handlers),
"handlers": [{
"name": h.name,
"invoked": h.invoked_count,
"avg_time_ms": round(h.avg_time * 1000, 2),
} for h in handlers],
}
return {
"total_events": sum(self._metrics.values()),
"total_handlers": sum(len(v) for v in self._handlers.values()),
"event_types": list(self._handlers.keys()),
}
# 使用示例
async def demo_enterprise_bus():
bus = EnterpriseEventBus()
# 注册处理器
bus.register("user.created", lambda e: print(
f"发送欢迎邮件: {e.data['email']}"), priority=1)
bus.register("user.created", lambda e: print(
f"初始化用户存储: {e.data['uid']}"), priority=2)
# 分发事件
await bus.dispatch(Event(
type="user.created",
data={"uid": "U-001", "email": "alice@example.com"},
))
# 查看统计
print(bus.get_stats("user.created"))
asyncio.run(demo_enterprise_bus())
十一、核心要点总结
- 观察者模式本质:定义一对多依赖关系,状态变化时自动通知所有订阅者。核心目标是降低发布者和订阅者之间的耦合度。
- weakref 是必须的:在生产环境中使用观察者模式,务必通过 weakref.WeakSet 或 weakref.WeakMethod 管理观察者引用,否则极易造成内存泄漏。
- __call__ 让对象可回调:实现 __call__ 方法的对象可以像函数一样被调用,同时保留对象的状态,这是Python中实现观察者的独特优势。
- 经典 vs 发布-订阅:经典模式适合单一系统内的直接通知;发布-订阅通过中间代理实现完全解耦,适合分布式系统和跨模块通信。
- 生命周期管理:始终提供便捷的注销机制——上下文管理器(@contextmanager)或令牌模式(返回注销函数/令牌),确保观察者在不需要时能被正确清理。
- 异步支持:在asyncio程序中应使用 asyncio.gather 并发执行观察者,注意同步/异步函数的兼容性处理(使用 asyncio.iscoroutine 检测)。
- 框架学习:Django Signals 和 Qt Signals 是观察者模式在真实框架中的优秀实践,值得深入学习其实现源码。
- 优先级机制:在复杂的业务场景中,为观察者设置优先级可以精确控制执行顺序,避免依赖隐式的注册顺序。
- 错误隔离:单个观察者的异常不应影响其他观察者,始终使用 try/except 包装每个观察者的调用。
- 性能监控:在企业级应用中,添加调用计数和耗时统计可以帮助定位性能瓶颈和排查问题。
十二、进一步思考
实战练习建议
- 实现一个文件监控器:使用观察者模式监听目录变化,当文件被创建、修改或删除时通知不同的处理器(日志记录、自动备份、索引更新)。
- 改造现有代码:找一段你项目中紧密耦合的if-else/switch逻辑,用观察者模式重构为可扩展的事件驱动架构。
- 对比不同框架:深入研究 Django Signals 和 PyQt Signals 的源码实现,对比它们在弱引用管理和线程安全方面的异同。
- 实现协程版发布订阅:基于 asyncio.Queue 实现一个支持背压(backpressure)的发布-订阅系统,处理慢消费者问题。
- 结合类型系统:使用 typing.Protocol 定义观察者接口,结合 dataclass 定义事件类型,打造类型安全的观察者框架。
扩展阅读
- 《设计模式:可复用面向对象软件的基础》—— GoF 四人组,观察者模式章节
- Python官方文档:weakref 模块——深入理解弱引用的实现原理
- Django源码:django/dispatch/dispatcher.py——生产级信号调度器的参考实现
- RxPY 库——观察者模式在响应式编程中的高级应用
- 《Python源码剖析》——理解Python对象模型中与观察者模式相关的语言特性(__call__、属性描述符等)