Python 的 signal 模块提供了与 Unix 信号机制交互的接口,允许开发者注册自定义的信号处理器(handler),当进程收到特定信号时执行预定义的逻辑。信号是一种异步事件通知机制 —— 操作系统可以随时向进程发送信号,打断其正常执行流程。熟练掌握 signal 模块,对于编写健壮的后端服务、守护进程、命令行工具以及超时控制系统至关重要。
信号(Signal)是 Unix 操作系统提供的一种进程间通信(IPC)机制,其本质是一条由内核发往进程的简短异步通知。每个信号用一个正整数标识(Signal Number),并定义了对应的默认行为。
| 信号名称 | 信号编号 | 默认行为 | 触发场景 |
|---|---|---|---|
| SIGINT | 2 | 终止进程 | 用户按下 Ctrl+C |
| SIGTERM | 15 | 终止进程 | kill 命令默认发送 |
| SIGHUP | 1 | 终止进程 | 终端断开、守护进程重载配置 |
| SIGKILL | 9 | 终止进程(不可捕获) | kill -9 |
| SIGSTOP | 19 | 暂停进程(不可捕获) | Ctrl+Z |
| SIGALRM | 14 | 终止进程 | alarm() 定时器到期 |
| SIGPIPE | 13 | 终止进程 | 向已关闭的管道写入数据 |
| SIGUSR1 | 10 | 终止进程 | 应用程序自定义 |
| SIGUSR2 | 12 | 终止进程 | 应用程序自定义 |
SIGKILL (9) 和 SIGSTOP (19) 是仅有的两个不可捕获、不可忽略、不可阻塞的信号。这意味着任何进程都无法阻止自己被杀掉或被暂停。这也是 kill -9 作为"最终手段"的原因 —— 即使进程处于死锁状态,SIGKILL 也能强制终止它。其他信号均可以通过 Python 注册自定义处理器来改变默认行为。
signal.signal(signum, handler) 是 signal 模块的核心函数,用于将某个信号与一个处理器函数绑定。处理器函数接收两个参数:信号编号(signum)和当前栈帧(frame)。
import signal import time def handler(signum, frame): print(f"收到信号: {signum}") # 注册 SIGINT (Ctrl+C) 的自定义处理器 signal.signal(signal.SIGINT, handler) print("按 Ctrl+C 触发自定义处理器") # 死循环等待信号 while True: time.sleep(1)
| 处理器值 | 含义 | 典型用途 |
|---|---|---|
| signal.SIG_DFL | 恢复为默认行为(由操作系统决定) | 取消之前的自定义处理器 |
| signal.SIG_IGN | 忽略该信号(相当于空处理器) | 忽略 SIGPIPE 避免 BrokenPipeError |
| callable(signum, frame) | 自定义处理器函数 | 优雅关闭、清理资源、记录日志 |
在生产服务中最常见的信号使用场景是优雅关闭:当收到 SIGTERM 或 SIGINT 时,不直接退出,而是先完成正在处理的请求、关闭数据库连接、释放资源,然后再退出。
import signal import time import sys class GracefulShutdown: def __init__(self): self.running = True # 注册多个信号到同一个处理器 signal.signal(signal.SIGINT, self.shutdown) signal.signal(signal.SIGTERM, self.shutdown) signal.signal(signal.SIGHUP, self.reload_config) def shutdown(self, signum, frame): print(f"\n收到信号 {signum},开始优雅关闭...") self.running = False # 在实际场景中,此处应等待任务队列清空、 # 关闭连接池、刷新缓冲区等 def reload_config(self, signum, frame): print("收到 SIGHUP,重新加载配置...") # 重新读取配置文件 def run(self): print("服务运行中。按 Ctrl+C 停止。") while self.running: time.sleep(1) print("清理完成,服务退出。") sys.exit(0) server = GracefulShutdown() server.run()
signal.alarm(time) 是 Unix 系统提供的定时器机制:设置在 time 秒后向自身进程发送 SIGALRM 信号。它只生效一次(非周期性),常用于为 I/O 操作设置超时。
signal.alarm() 与 time.sleep() 在 Python 中存在已知交互问题:在某些平台上,sleep 可能被 SIGALRM 打断并提前返回。此外,alarm 的精度为秒级,不适合毫秒级的超时需求。
import signal import time class TimeoutError(Exception): """自定义超时异常""" pass def timeout_handler(signum, frame): raise TimeoutError("操作超时") def risky_operation(seconds): """模拟一个可能卡住的操作""" time.sleep(seconds) return "操作完成" # 设置 3 秒超时 signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(3) try: result = risky_operation(5) # 这个操作需要 5 秒,会超时 print(result) except TimeoutError as e: print(f"捕获超时: {e}") finally: signal.alarm(0) # 务必取消闹钟!
当进程处于 time.sleep() 或 select.select() 等阻塞调用中时收到信号,系统调用会被中断(EINTR),Python 会自动重试大部分系统调用。但在一些底层场景(如直接调用 C 扩展),可能需要显式处理 InterruptedError 异常。
import signal import time import errno def handler(signum, frame): print("信号到达,中断 sleep") signal.signal(signal.SIGALRM, handler) signal.alarm(2) try: time.sleep(10) except InterruptedError: # Python 3.5+ 中 sleep 不会抛出 InterruptedError, # 但某些 C 扩展可能仍会触发 print("sleep 被信号中断") finally: signal.alarm(0)
Python 3.5 之后引入了 PEP 475,使大部分系统调用在收到 EINTR 后自动重试,开发者无需手动处理 InterruptedError。这对于 time.sleep()、os.read()、select.select() 等常用函数均适用。
Python 的信号机制有一个严格的安全约束:signal.signal() 只能在主线程(Main Thread)中调用。在子线程中调用会抛出 ValueError。同样,只有主线程能接收信号。
Python 的信号处理是在解释器层面实现的:当信号到达时,CPython 解释器在两条字节码指令之间检查"待处理信号"队列,然后调用注册的处理器。这个机制依赖于 GIL(全局解释器锁),而子线程不拥有 GIL 的主控权。如果在子线程中执行信号处理器,会导致竞态条件和不可预测的内存状态。因此 Python 的设计者强制将信号处理限制在主线程。
import signal import threading def bad(): # 在子线程中注册信号处理器 —— 抛出 ValueError! signal.signal(signal.SIGINT, lambda s, f: None) t = threading.Thread(target=bad) t.start() # ValueError: signal only works in main thread t.join()
import signal import threading import time class SignalRelay: """在主线程注册信号,通过 Event 通知其他线程""" def __init__(self): self.stop_event = threading.Event() # 主线程中注册 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, signum, frame): print(f"信号 {signum} 到达,通知所有工作线程") self.stop_event.set() def worker(self): while not self.stop_event.is_set(): print("工作线程运行中...") if self.stop_event.wait(timeout=1): break print("工作线程已停止") relay = SignalRelay() threads = [threading.Thread(target=relay.worker) for _ in range(3)] for t in threads: t.start() for t in threads: t.join() print("所有线程已退出")
在多线程程序中,正确的信号处理模式是:主线程专门负责接收信号,然后通过线程安全的同步原语(threading.Event、queue.Queue、threading.Condition 等)通知其他工作线程。工作线程不直接接触信号 API。
signal.setitimer(which, seconds, interval) 提供了比 signal.alarm() 更精细的定时器控制。它支持三种定时器模式,精度达到微秒级,且可以设置周期性触发。
| 常量 | 计时方式 | 触发信号 | 典型用途 |
|---|---|---|---|
| signal.ITIMER_REAL | 实际时间(墙上时钟) | SIGALRM | 普通定时任务 |
| signal.ITIMER_VIRTUAL | 进程用户态 CPU 时间 | SIGVTALRM | 代码执行时间分析 |
| signal.ITIMER_PROF | 用户态 + 内核态 CPU 时间 | SIGPROF | profiling 性能分析 |
import signal import time counter = 0 def timer_handler(signum, frame): global counter counter += 1 print(f"[{counter}] 定时器触发,信号: {signum}") # 注册 SIGALRM 处理器 signal.signal(signal.SIGALRM, timer_handler) # 0.1 秒后首次触发,之后每 0.5 秒触发一次 signal.setitimer(signal.ITIMER_REAL, 0.1, 0.5) try: print("定时器已启动 (按 Ctrl+C 停止)") time.sleep(3) # 等待约 6 次触发 finally: # 关闭定时器:将间隔设为 0 signal.setitimer(signal.ITIMER_REAL, 0, 0) print(f"定时器已关闭,共触发 {counter} 次")
import signal signal.setitimer(signal.ITIMER_REAL, 5, 1) remaining, interval = signal.getitimer(signal.ITIMER_REAL) print(f"距下次触发: {remaining} 秒,间隔: {interval} 秒") # 输出: 距下次触发: 4.987... 秒,间隔: 1.0 秒 # 停止定时器 signal.setitimer(signal.ITIMER_REAL, 0, 0)
SIGPIPE 是管道编程中最常见又最容易被忽视的信号。当一个进程向一个读取端已关闭的管道写入数据时,内核会向写入进程发送 SIGPIPE 信号。该信号的默认行为是终止进程,如果你的程序被意外终止,很可能是 SIGPIPE 导致的。
将 SIGPIPE 设置为 SIG_IGN,让系统调用直接返回错误码,Python 将抛出 BrokenPipeError 异常。
import signal import sys import time # 忽略 SIGPIPE 信号 signal.signal(signal.SIGPIPE, signal.SIG_IGN) # 此时写入关闭的管道会抛出 BrokenPipeError try: for i in range(100000): print(f"第 {i} 行数据", flush=True) time.sleep(0.01) except BrokenPipeError: # 标准输出已关闭(如 head 命令已读取完毕) sys.stderr.write("\n管道已关闭,退出\n") sys.exit(0)
import signal import sys def sigpipe_handler(signum, frame): """自定义 SIGPIPE 处理器:记录日志后退出""" sys.stderr.write("检测到管道关闭,正在清理...\n") # 执行必要的清理操作 sys.exit(0) signal.signal(signal.SIGPIPE, sigpipe_handler)
需要特别注意的是:在 subprocess 模块创建的子进程中,SIGPIPE 的处理方式可能被继承。如果父进程忽略了 SIGPIPE,子进程也可能继承这一行为,导致意外的错误处理。建议在子进程启动前重置信号处理器。
signal 模块的核心 API 是 POSIX 标准的一部分,因此信号处理在 Linux、macOS 和其他类 Unix 系统上表现一致。但 Windows 上的信号支持非常有限。
| 信号 | Windows 支持 | 备注 |
|---|---|---|
| SIGINT (Ctrl+C) | 支持 | 可注册自定义处理器 |
| SIGTERM | 部分支持 | 可通过 os.kill() 发送 |
| SIGBREAK | 支持 | Windows 特有,Ctrl+Break |
| SIGALRM | 不支持 | AttributeError |
| SIGHUP | 不支持 | AttributeError |
| SIGPIPE | 不支持 | Windows 使用不同的管道机制 |
| SIGUSR1/SIGUSR2 | 不支持 | 应用程序自定义信号 |
| setitimer | 不支持 | Windows 无此系统调用 |
import signal import sys def setup_signal_handlers(): """跨平台的信号处理器注册函数""" # SIGINT 在所有平台都支持 signal.signal(signal.SIGINT, shutdown_handler) if sys.platform != "win32": # 仅 Unix 平台可用的信号 signal.signal(signal.SIGTERM, shutdown_handler) signal.signal(signal.SIGHUP, reload_handler) try: # macOS 和 Linux 都支持 signal.signal(signal.SIGUSR1, usr1_handler) except AttributeError: pass else: # Windows 特有信号 signal.signal(signal.SIGBREAK, shutdown_handler)
import signal def has_signal(name): """检查当前平台是否支持某个信号""" return hasattr(signal, name) for sig in ["SIGALRM", "SIGHUP", "SIGPIPE", "SIGUSR1"]: print(f"{sig}: {'支持' if has_signal(sig) else '不支持'}")
守护进程(Daemon)是长期在后台运行的服务进程,信号处理是守护进程的核心基础设施之一。典型的守护进程信号处理策略如下:
import signal import time import sys import os import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger("daemon") class DaemonApp: """一个完整的守护进程信号处理骨架""" def __init__(self): self.running = True self.restart_flag = False def setup_signals(self): """初始化所有信号处理器""" signal.signal(signal.SIGINT, self.handle_signal) signal.signal(signal.SIGTERM, self.handle_signal) signal.signal(signal.SIGHUP, self.handle_signal) signal.signal(signal.SIGUSR1, self.handle_signal) signal.signal(signal.SIGUSR2, self.handle_signal) def handle_signal(self, signum, frame): """统一的信号处理器""" if signum in (signal.SIGINT, signal.SIGTERM): logger.info(f"收到终止信号 ({signum}),准备退出") self.running = False elif signum == signal.SIGHUP: logger.info("收到 SIGHUP,重新加载配置") # 重新读取配置文件 self.reload_config() elif signum == signal.SIGUSR1: logger.info("收到 SIGUSR1,输出调试信息") self.print_status() elif signum == signal.SIGUSR2: logger.info("收到 SIGUSR2,触发热重启") self.restart_flag = True self.running = False def reload_config(self): # 在生产中此处读取配置文件 logger.info("配置已重新加载") def print_status(self): logger.info("当前状态: 正在运行") def run(self): """主循环""" logger.info("守护进程启动完成") while self.running: # 执行主要业务逻辑 time.sleep(1) logger.info("守护进程正在退出...") daemon = DaemonApp() daemon.setup_signals() daemon.run() # 在此执行清理 logger.info("守护进程已完全退出")
# 启动守护进程 $ python daemon_app.py & # 查看进程 PID $ echo $! 12345 # 发送各种信号 $ kill -SIGUSR1 12345 # 输出调试状态 $ kill -SIGHUP 12345 # 重新加载配置 $ kill -SIGUSR2 12345 # 热重启 $ kill 12345 # 向进程发送 SIGTERM(默认) $ kill -SIGINT 12345 # 模拟 Ctrl+C $ kill -9 12345 # 强制杀死(不可忽略)
信号处理器是异步执行的,如果处理器修改了主程序正在使用的共享变量,可能导致竞态条件。
# 不安全 —— 可能在整数操作中途触发信号 counter = 0 def handler(signum, frame): global counter counter += 1 # Python 中 'counter += 1' 不是原子操作 # 更安全的做法:使用 atomic 标志或 threading.Event stop_requested = False def safe_handler(signum, frame): global stop_requested stop_requested = True # 简单赋值在 CPython 中是原子的
在信号处理器中调用非 async-signal-safe 的函数(如 print()、logging、time.sleep())可能导致死锁。Python 的 print() 虽然看似安全,但在信号处理中应避免复杂操作。
在信号处理器中,你应该只做最轻量的事情:设置一个标志位、写入一个管道描述符、或者触发一个 threading.Event。绝不要在信号处理器中获取锁、分配内存、或调用可能阻塞的函数。
如果第三方库内部使用了 signal.alarm() 或 setitimer(),你的代码可能会意外覆盖这些设置。最佳做法是在使用 alarm 的前后保存和恢复定时器状态。
import signal # 保存当前 alarm 状态 old_alarm = signal.alarm(0) # 取消并获取剩余时间 try: signal.alarm(5) # 执行需要超时控制的代码 do_something() finally: signal.alarm(0) # 取消本 alarm if old_alarm: signal.alarm(int(old_alarm)) # 恢复原始 alarm
Python 3.5 之后,asyncio 事件循环提供了 loop.add_signal_handler(signum, callback, ...) 方法,将 Unix 信号集成到事件循环中,比直接使用 signal.signal() 更安全且更符合异步编程范式。
import asyncio import signal async def main(): """ asyncio 风格的信号处理: 信号回调在事件循环上下文中执行,避免了线程安全问题 """ loop = asyncio.get_running_loop() # 注册信号 —— 不需要担心线程安全问题 for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler( sig, lambda s=sig: print(f"asyncio 收到信号 {s}") ) # 模拟长时间运行 try: while True: await asyncio.sleep(1) finally: # 移除信号处理器 for sig in (signal.SIGINT, signal.SIGTERM): loop.remove_signal_handler(sig) asyncio.run(main())
import asyncio import signal async def graceful_shutdown(): """asyncio 风格的优雅关闭""" print("正在优雅关闭...") await asyncio.sleep(1) # 模拟清理操作 print("已清理完成") async def main(): loop = asyncio.get_running_loop() shutdown_event = asyncio.Event() def signal_handler(): print("信号到达,准备关闭") shutdown_event.set() loop.add_signal_handler(signal.SIGINT, signal_handler) loop.add_signal_handler(signal.SIGTERM, signal_handler) await shutdown_event.wait() await graceful_shutdown() asyncio.run(main())
Python 中实现异步行为的机制有多种,各自适用场景不同:
信号处理的核心哲学可以概括为"通知而非执行"。信号处理器不是业务逻辑的执行场所,而是事件的通知渠道。正确的信号处理应该在整个系统中形成一条清晰的事件链路:
OS 信号 → Python 信号处理器 → 标志位/Event → 主循环/工作线程 → 清理退出
遵循这一链路设计,可以写出既健壮又可维护的信号处理代码。