FTP/SFTP:文件传输自动化

Python 办公自动化专题 · 实现远程文件传输的完全自动化

专题:Python 自动化办公系统学习

关键词:Python, 自动化办公, FTP, SFTP, paramiko, ftplib, 文件传输, 文件同步, Python自动化

一、文件传输协议概述

文件传输是网络运维和自动化办公中最基础也最常见的需求之一。无论是将本地的备份数据上传到远程服务器,还是从远程服务器拉取日志文件进行分析,都离不开高效、可靠的文件传输协议。在众多传输协议中,FTP(File Transfer Protocol)、SFTP(SSH File Transfer Protocol)和SCP(Secure Copy Protocol)是三种最常用的方式,但它们在工作机制、安全性和适用场景上存在显著差异。

FTP是最早出现的文件传输协议之一,它采用客户端-服务器架构,默认使用21号端口进行控制连接,而数据连接则根据模式不同采用不同端口。FTP支持主动模式(PORT)和被动模式(PASV)两种工作方式。在主动模式下,服务器主动连接客户端的指定端口来传输数据;在被动模式下,服务器开放一个随机端口,由客户端发起数据连接。由于主动模式在客户端有防火墙时容易失败,现代FTP客户端大多默认使用被动模式。

SFTP并不是FTP的安全版本,而是基于SSH协议(Secure Shell)的文件传输子系统。它使用22号端口,所有通信数据均经过加密传输,包括认证信息、命令和文件内容。与FTP相比,SFTP只需一个端口即可完成所有操作,不需要额外的数据连接,因而更容易穿越防火墙。SCP也是一种基于SSH的传输方式,但相比SFTP只支持简单的文件上传和下载,不支持目录列表、删除等文件管理操作。

在选择传输协议时,核心考量因素包括:安全性(是否加密传输)、防火墙友好性(端口需求)、功能丰富度(是否支持目录管理、权限设置等)、以及性能表现。对于内网环境且对安全要求不高的场景,FTP仍可胜任;对于互联网传输或涉及敏感数据的场景,应优先选择SFTP或FTPS(FTP over TLS)。在自动化办公场景中,Python的ftplib库可以方便地操作FTP/FTPS,而paramiko库则为SFTP提供了强大支持。

协议对比总结:安全方面 SFTP > FTPS > FTP;防火墙友好性 SFTP > FTP(被动模式)> FTP(主动模式);功能丰富度 SFTP >= FTP >> SCP;认证方式 SFTP支持密钥对认证,FTP仅支持密码认证。

二、ftplib基础

Python标准库中的ftplib模块提供了FTP协议客户端实现,无需安装任何第三方包即可连接FTP服务器、执行文件和目录操作。ftplib的核心是FTP类,通过它我们可以建立连接、登录认证、获取目录列表、上传和下载文件等。下面我们从最基础的连接和登录开始,一步步掌握ftplib的使用。

建立FTP连接通常有两种方式:一种是分别调用FTP()创建对象再调用connect()和login()方法;另一种是直接传入主机地址,由构造函数自动连接。对于匿名FTP服务器,可以省略用户名和密码;对于需要认证的服务器,则需提供正确的凭证。连接成功后,我们可以使用getwelcome()方法获取服务器的欢迎信息,使用pwd()查看当前工作目录。

from ftplib import FTP import os # 连接FTP服务器 ftp = FTP() ftp.connect('ftp.example.com', 21) # 连接主机和端口 ftp.login('username', 'password') # 登录认证 print(f"欢迎信息: {ftp.getwelcome()}") print(f"当前目录: {ftp.pwd()}") # 获取目录列表 files = ftp.nlst() # 仅获取文件名列表 print(f"文件列表: {files}") # 更详细的目录信息 ftp.dir() # 打印详细目录列表 # 切换目录 ftp.cwd('/pub/data') # 创建目录 ftp.mkd('backup') # 重命名 ftp.rename('old.txt', 'new.txt') # 删除文件 ftp.delete('temp.txt') # 删除空目录 ftp.rmd('empty_dir') ftp.quit()

文件的上传和下载是ftplib最核心的功能。上传文件使用storbinary()方法(二进制模式)或storlines()方法(ASCII模式),下载文件使用retrbinary()或retrlines()。对于二进制文件(如压缩包、图片、视频),必须使用binary模式,以确保数据传输的完整性。对于文本文件,可以使用ASCII模式,它会自动进行换行符转换。

from ftplib import FTP ftp = FTP('ftp.example.com') ftp.login('user', 'pass') ftp.cwd('/upload') # 上传文件(二进制模式) with open('local_file.zip', 'rb') as f: ftp.storbinary('STOR remote_file.zip', f) print("上传完成") # 下载文件(二进制模式) with open('downloaded.zip', 'wb') as f: ftp.retrbinary('RETR remote_file.zip', f.write) print("下载完成") # 上传文本文件(ASCII模式) with open('report.txt', 'r') as f: ftp.storlines('STOR report.txt', f) ftp.quit()

要点:使用storbinary时,STOR命令后的文件名是服务器端保存的文件名;retrbinary的回调函数接收数据块作为参数,通常传入文件对象的write方法。使用完毕后务必调用quit()或close()释放连接。

三、ftplib进阶

在掌握了ftplib的基础操作后,我们需要应对更复杂的实际场景:大文件传输可能因网络波动中断、文件类型不同需要不同的传输模式、需要实时显示传输进度、以及安全合规要求使用加密传输等。本节将深入探讨这些进阶用法。

被动模式(PASV)是突破防火墙限制的关键。现代FTP服务器通常默认启用被动模式,但有时需要手动设置。通过set_pasv(True)可以启用被动模式,这样数据连接由客户端发起,避免了服务器主动连接客户端时被防火墙拦截的问题。对于超大文件(如数百MB甚至GB级别),应使用分块读取方式传输,避免一次性将整个文件读入内存。

from ftplib import FTP import os ftp = FTP('ftp.example.com') ftp.login('user', 'pass') ftp.set_pasv(True) # 启用被动模式 # 带进度回调的上传 def upload_with_progress(local_path, remote_path): file_size = os.path.getsize(local_path) transferred = 0 def callback(data): nonlocal transferred transferred += len(data) pct = (transferred / file_size) * 100 print(f"\r上传进度: {pct:.1f}% ({transferred}/{file_size} bytes)", end='') with open(local_path, 'rb') as f: ftp.storbinary(f'STOR {remote_path}', f, 8192, callback) print("\n上传完成") upload_with_progress('large_file.iso', 'backup.iso') ftp.quit()

断点续传是大文件传输中极其重要的功能。当传输因网络中断而失败时,断点续传允许从中断位置继续传输,无需重新开始。实现原理是:在上传前先检查服务器端是否已存在同名文件并获取其大小,然后从该位置开始读取本地文件;下载时则根据本地已下载的文件大小设置REST偏移量。

from ftplib import FTP def resume_upload(ftp, local_path, remote_path): """断点续传-上传""" try: server_size = ftp.size(remote_path) except: server_size = 0 local_size = os.path.getsize(local_path) if server_size >= local_size: print("文件已完整存在,跳过") with open(local_path, 'rb') as f: f.seek(server_size) # 跳到已上传位置之后 ftp.storbinary(f'STOR {remote_path}', f) def resume_download(ftp, remote_path, local_path): """断点续传-下载""" local_size = os.path.getsize(local_path) if os.path.exists(local_path) else 0 server_size = ftp.size(remote_path) if local_size >= server_size: print("文件已完整存在,跳过") with open(local_path, 'ab') as f: ftp.retrbinary(f'RETR {remote_path}', f.write, rest=local_size) ftp = FTP('ftp.example.com') ftp.login('user', 'pass') resume_download(ftp, 'bigdata.csv', 'bigdata.csv') ftp.quit()

FTP over TLS(FTPS)是FTP的加密版本,通过TLS/SSL加密整个会话。Python的ftplib提供了FTP_TLS类来支持FTPS。连接时需要通过SSL上下文配置证书验证策略,可以设置为不验证(适合自签名证书的内部服务器)或严格验证(生产环境推荐)。FTPS有两种模式:隐式TLS(默认使用990端口,连接即加密)和显式TLS(先建立普通连接,再通过AUTH TLS命令升级为加密连接)。

from ftplib import FTP_TLS import ssl # FTPS - 显式TLS ftps = FTP_TLS('ftp.example.com') ftps.login('user', 'pass') ftps.prot_p() # 启用数据通道加密 ftps.set_pasv(True) # 传输文件(自动加密) ftps.storbinary('STOR secret.txt', open('secret.txt', 'rb')) ftps.quit() # 自定义SSL上下文(跳过证书验证,仅内网使用) ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE ftps = FTP_TLS('192.168.1.100') ftps.ssl_version = ssl.PROTOCOL_TLS ftps.context = ssl_context ftps.login('admin', 'password') ftps.prot_p()

四、paramiko SFTP

paramiko是Python中最流行的SSH协议实现库,它提供了SSH客户端和SFTP客户端的完整功能。与ftplib不同,paramiko需要单独安装(pip install paramiko),但它在安全性、功能丰富度和跨平台支持方面远超ftplib。SFTP基于SSH协议,所有通信都经过加密,支持密码认证和更安全的密钥对认证。

使用paramiko进行SFTP操作的基本流程是:先建立SSH连接,然后通过SSH客户端打开SFTP会话。建立SSH连接时可以指定主机名、端口、用户名和密码(或密钥文件)。连接成功后调用open_sftp()获取SFTP客户端对象,之后就可以进行文件传输、目录操作和权限管理等。使用完毕后应依次关闭SFTP客户端和SSH连接。

import paramiko # 创建SSH客户端 ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接服务器(密码认证) ssh.connect( hostname='sftp.example.com', port=22, username='admin', password='secret' ) # 打开SFTP会话 sftp = ssh.open_sftp() # 列出目录 print(sftp.listdir('/home/admin')) print(sftp.listdir_attr('/home/admin')) # 带文件属性 # 获取文件属性 attr = sftp.stat('/home/admin/report.pdf') print(f"大小: {attr.st_size} bytes, 修改时间: {attr.st_mtime}") # 创建/删除目录 sftp.mkdir('/home/admin/new_folder') sftp.rmdir('/home/admin/empty_folder') # 删除文件 sftp.remove('/home/admin/temp.txt') # 重命名/移动 sftp.rename('/home/admin/old.txt', '/home/admin/new.txt') sftp.close() ssh.close()

密钥认证是SFTP的推荐认证方式,它比密码认证更安全且适合自动化脚本。首先需要在客户端生成SSH密钥对(ssh-keygen),然后将公钥添加到服务器的~/.ssh/authorized_keys文件中。paramiko支持RSA、ECDSA、Ed25519等多种密钥类型。如果私钥有密码保护,可以在连接时通过password参数提供密钥密码。

import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # SSH密钥认证 private_key = paramiko.RSAKey.from_private_key_file( 'C:/Users/me/.ssh/id_rsa', password='key_password' # 如果私钥有密码 ) ssh.connect( hostname='sftp.example.com', port=22, username='admin', pkey=private_key ) sftp = ssh.open_sftp() # 上传文件 sftp.put('local_file.txt', '/remote/path/file.txt') # 下载文件 sftp.get('/remote/path/file.txt', 'local_file.txt') # 设置文件权限 sftp.chmod('/remote/path/file.txt', 0o644) # rw-r--r-- # 获取文件 sftp.get('/home/admin/report.pdf', 'report.pdf') sftp.close() ssh.close()

提示:在自动化脚本中,使用密钥认证可以避免在代码中硬编码密码。建议将私钥文件放在安全的路径,并设置严格的文件权限。对于Windows环境,密钥文件通常存放在 %USERPROFILE%\.ssh\ 目录下。

五、paramiko进阶

在实际应用中,我们往往需要传输多个文件、同步整个目录、实时监控传输进度,并在网络异常时自动重试。paramiko提供了足够的灵活性来构建这些高级功能。本节将介绍多文件传输、目录同步、进度监控、连接池管理以及异常处理与自动重连等进阶技巧。

多文件传输和目录同步是文件传输自动化的核心需求。通过结合os模块的文件遍历功能和sftp.put/get方法,可以批量传输文件。传输进度监控通过为put/get方法提供callback参数实现,该回调函数接收已传输字节数和总字节数两个参数,可用于计算传输速度和预估剩余时间。

import paramiko import os import time def upload_dir(sftp, local_dir, remote_dir): """递归上传整个目录""" for root, dirs, files in os.walk(local_dir): # 计算远程路径 rel_path = os.path.relpath(root, local_dir) remote_path = remote_dir.replace('\\', '/') if rel_path != '.': remote_path += '/' + rel_path.replace('\\', '/') # 确保远程目录存在 try: sftp.stat(remote_path) except FileNotFoundError: sftp.mkdir(remote_path) # 上传文件 for file in files: local_file = os.path.join(root, file) remote_file = remote_path + '/' + file sftp.put(local_file, remote_file) def download_with_progress(sftp, remote_path, local_path): """带进度监控的下载""" file_size = sftp.stat(remote_path).st_size start_time = time.time() def callback(transferred, total): pct = (transferred / total) * 100 elapsed = time.time() - start_time speed = transferred / elapsed / 1024 if elapsed > 0 else 0 print(f"\r进度: {pct:.1f}% | 速度: {speed:.0f} KB/s", end='') sftp.get(remote_path, local_path, callback=callback) print(f"\n下载完成,耗时: {time.time() - start_time:.1f}s") # 使用示例 ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('host', username='user', password='pass') sftp = ssh.open_sftp() upload_dir(sftp, './data', '/backup/data') download_with_progress(sftp, '/remote/bigfile.iso', 'bigfile.iso') sftp.close() ssh.close()

连接池管理和自动重连是生产环境脚本的关键可靠性保障。当需要频繁执行小文件传输时,每次重新建立SSH连接的开销很大。使用连接池可以复用已有连接,提高效率。同时,网络故障不可避免,健壮的脚本应该能够检测传输异常并自动重试。

import paramiko import time from functools import wraps def retry_on_failure(max_retries=3, delay=2): """传输重试装饰器""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except (paramiko.SSHException, EOFError) as e: if attempt == max_retries - 1: raise print(f"传输失败 (第{attempt+1}次),{delay}秒后重试...") time.sleep(delay) return None return wrapper return decorator class SFTPPool: """简单的SFTP连接池""" def __init__(self, host, port, username, password, pool_size=3): self.host = host self.port = port self.username = username self.password = password self.pool_size = pool_size self._connections = [] def get_connection(self): if self._connections: return self._connections.pop() return self._create_connection() def return_connection(self, sftp): if len(self._connections) < self.pool_size: self._connections.append(sftp) else: sftp.close() def _create_connection(self): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(self.host, port=self.port, username=self.username, password=self.password) return ssh.open_sftp() def close_all(self): for conn in self._connections: conn.close() self._connections = [] @retry_on_failure(max_retries=3) def safe_transfer(sftp, local_path, remote_path): sftp.put(local_path, remote_path) print(f"传输成功: {local_path} -> {remote_path}") # 使用连接池 pool = SFTPPool('host', 22, 'user', 'pass') sftp = pool.get_connection() safe_transfer(sftp, 'data.csv', '/upload/data.csv') pool.return_connection(sftp) pool.close_all()

六、目录同步

目录同步是文件传输自动化的高级应用,它实现的是本地目录和远程目录之间的内容一致。一个成熟的目录同步工具不仅要能复制文件,还应该支持增量同步(只传输变化的部分)、文件过滤(按通配符或正则表达式排除不需要的文件)、冲突处理(当本地和远程都修改了同一个文件时的处理策略)以及双工同步(双向同步,即本地和远程互为源和目标)。

增量同步算法是目录同步的核心。其基本原理是:遍历源目录中的所有文件,计算每个文件的MD5哈希值或检查文件的修改时间与大小,然后与目标目录中的对应文件进行比较。只有当文件在目标目录中不存在、或大小不同、或修改时间不同、或MD5值不同时,才执行传输。这样可以极大减少不必要的网络传输,尤其适用于大规模目录的定期同步。

import paramiko import os import hashlib from pathlib import PurePosixPath def md5_file(filepath): """计算文件MD5""" h = hashlib.md5() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(8192), b''): h.update(chunk) return h.hexdigest() def sync_local_to_remote(sftp, local_dir, remote_dir, exclude=None): """本地到远程的增量同步""" exclude = exclude or [] synced = 0 for root, dirs, files in os.walk(local_dir): rel = os.path.relpath(root, local_dir).replace('\\', '/') rpath = remote_dir + ('/' + rel if rel != '.' else '') # 创建远程目录 try: sftp.stat(rpath) except FileNotFoundError: sftp.mkdir(rpath) print(f"创建目录: {rpath}") for file in files: if any(excl in file for excl in exclude): continue local_path = os.path.join(root, file) remote_path = rpath + '/' + file local_md5 = md5_file(local_path) # 检查远程文件是否需要更新 needs_upload = True try: with sftp.open(remote_path, 'rb') as rf: remote_data = rf.read() remote_md5 = hashlib.md5(remote_data).hexdigest() if remote_md5 == local_md5: needs_upload = False except FileNotFoundError: pass if needs_upload: sftp.put(local_path, remote_path) synced += 1 print(f"同步: {local_path} -> {remote_path}") print(f"同步完成,共传输 {synced} 个文件") # 使用示例 ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('host', username='user', password='pass') sftp = ssh.open_sftp() sync_local_to_remote( sftp, './website', '/var/www/html', exclude=['.tmp', '.log', '__pycache__'] ) sftp.close() ssh.close()

更高级的双工同步需要考虑双向的变更检测和冲突解决。当本地和远程的同一个文件都被修改时,需要制定冲突解决策略:通常有"本地优先"、"远程优先"、"保留最新"和"手动处理"四种策略。在自动化办公场景中,推荐使用"保留最新"策略,即比较文件的最后修改时间,保留较新的版本并覆盖较旧的版本。

import paramiko import os import hashlib def bidirectional_sync(sftp, local_dir, remote_dir, strategy='newest'): """双向同步""" conflicts = [] # 获取远程文件列表 remote_files = {} def list_remote(sftp, path, prefix=''): for entry in sftp.listdir_attr(path): full_path = path + '/' + entry.filename rel_path = prefix + '/' + entry.filename if prefix else entry.filename if entry.st_mode & 0o40000: # 目录 list_remote(sftp, full_path, rel_path) else: remote_files[rel_path] = { 'size': entry.st_size, 'mtime': entry.st_mtime } list_remote(sftp, remote_dir) # 遍历本地文件 for root, dirs, files in os.walk(local_dir): for file in files: local_path = os.path.join(root, file) rel_path = os.path.relpath(local_path, local_dir).replace('\\', '/') local_mtime = os.path.getmtime(local_path) if rel_path not in remote_files: # 仅本地存在,上传 remote_path = remote_dir + '/' + rel_path sftp.put(local_path, remote_path) print(f"新增到远程: {rel_path}") else: # 两边都存在,比较修改时间 remote_mtime = remote_files[rel_path]['mtime'] local_size = os.path.getsize(local_path) remote_size = remote_files[rel_path]['size'] if local_mtime > remote_mtime and local_size != remote_size: # 本地更新,上传 sftp.put(local_path, remote_dir + '/' + rel_path) print(f"本地更新 -> 远程: {rel_path}") elif remote_mtime > local_mtime and local_size != remote_size: # 远程更新,下载 sftp.get(remote_dir + '/' + rel_path, local_path) print(f"远程更新 -> 本地: {rel_path}") print("双向同步完成") # 使用示例 ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('host', username='user', password='pass') sftp = ssh.open_sftp() bidirectional_sync(sftp, './docs', '/shared/docs') sftp.close() ssh.close()

七、定时任务整合

文件传输自动化的最终目标是无人值守运行。将文件传输任务与定时调度框架整合,可以实现定期备份、定时采集、自动分发等功能。Python生态中有两个优秀的定时任务库:schedule(轻量级,适合简单场景)和APScheduler(功能强大,适合生产环境)。

schedule库的API非常简洁,使用装饰器或链式调用的方式即可定义定时任务。它支持按固定间隔执行、在特定时间执行、按周几执行等多种模式。然而schedule是单线程的,且不具备持久化和容错能力,不适合需要任务持久化的场景。APScheduler则提供了更完整的任务调度功能,支持多种触发器(cron、interval、date)、任务持久化(保存到数据库)、任务合并与错过执行等高级特性。

import schedule import time import paramiko def backup_logs(): """定时备份日志文件""" ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('backup-server', username='admin', password='pass') sftp = ssh.open_sftp() local_logs = 'C:/app/logs/' remote_dir = '/backup/logs/' + time.strftime('%Y%m%d') try: sftp.mkdir(remote_dir) except: pass for f in os.listdir(local_logs): if f.endswith('.log'): sftp.put( os.path.join(local_logs, f), remote_dir + '/' + f ) print(f"备份日志: {f}") sftp.close() ssh.close() # 每天凌晨2点执行备份 schedule.every().day.at("02:00").do(backup_logs) # 同时每30分钟检查一次增量变更 schedule.every(30).minutes.do(incremental_sync) while True: schedule.run_pending() time.sleep(60) # 每分钟检查一次

APScheduler提供了更强大的调度能力。在需要精确的cron表达式、任务持久化、或跨多个项目共享调度配置时,APScheduler是更好的选择。它还支持任务错过执行策略,例如当服务器在计划执行时间处于关机状态时,重启后可以自动补执行错过的任务。

from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger import paramiko import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def sync_files(): """SFTP文件同步任务""" logger.info("开始文件同步...") try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('host', username='user', password='pass') sftp = ssh.open_sftp() # 执行同步... logger.info("文件同步完成") sftp.close() ssh.close() except Exception as e: logger.error(f"同步失败: {e}") def collect_data(): """定时数据采集""" # 从多个远程服务器拉取数据文件 servers = [ {'host': 'server1', 'file': '/data/daily.csv'}, {'host': 'server2', 'file': '/data/stats.json'}, ] for svr in servers: try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(svr['host'], username='user', password='pass') sftp = ssh.open_sftp() sftp.get(svr['file'], f'./collected/{svr["host"]}_{time.strftime("%Y%m%d")}') sftp.close() ssh.close() except Exception as e: logger.error(f"采集失败 {svr['host']}: {e}") scheduler = BlockingScheduler() # 每天凌晨3点同步 scheduler.add_job(sync_files, CronTrigger(hour=3, minute=0)) # 每工作日早8点采集数据 scheduler.add_job(collect_data, CronTrigger( day_of_week='mon-fri', hour=8, minute=0 )) # 每15分钟执行增量检查 scheduler.add_job(incremental_check, 'interval', minutes=15) logger.info("调度器启动...") scheduler.start()

提示:定时任务脚本推荐使用日志模块记录每次执行的结果,便于排查问题。同时建议在任务入口处添加try-except捕获所有异常,避免单个任务失败导致整个调度器停止。

八、安全与可靠性

在生产环境中运行文件传输自动化,安全性和可靠性是不可或缺的两个维度。安全性关注传输过程中的数据保密性、认证安全性和主机身份验证;可靠性关注传输的完整性校验、断线恢复和异常处理。只有同时做好这两方面,才能构建出值得信赖的文件传输系统。

安全性方面,首先应优先使用SFTP或FTPS而非明文FTP,确保传输通道加密。对于SFTP,SSH主机密钥验证至关重要:首次连接时服务器的公钥指纹会被记录到known_hosts文件中,后续连接时客户端会验证服务器提供的公钥是否与记录一致,从而防止中间人攻击。paramiko提供了HostKeys类来管理known_hosts。密钥管理建议使用SSH密钥对替代密码认证,且定期轮换密钥。

import paramiko # 严格的主机密钥验证(推荐生产环境使用) ssh = paramiko.SSHClient() # 从known_hosts文件加载已知主机密钥 known_hosts = paramiko.HostKeys() known_hosts.load('C:/Users/me/.ssh/known_hosts') ssh.set_missing_host_key_policy(paramiko.RejectPolicy()) # 或者使用AutoAddPolicy自动添加新主机(仅测试环境) # ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('sftp.example.com', username='admin', password='pass') # 跳板机连接(堡垒机场景) def connect_via_jump(host, username, password, jump_host, jump_user): """通过跳板机连接目标服务器""" jump_ssh = paramiko.SSHClient() jump_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) jump_ssh.connect(jump_host, username=jump_user) # 建立隧道 transport = jump_ssh.get_transport() dest_addr = (host, 22) local_addr = ('localhost', 10022) channel = transport.open_channel( 'direct-tcpip', dest_addr, local_addr ) # 通过隧道连接目标 target_ssh = paramiko.SSHClient() target_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) target_ssh.connect('localhost', port=10022, username=username, password=password, sock=channel) return target_ssh

传输可靠性通过文件完整性校验来保障。最常见的做法是在传输完成后计算文件的MD5或SHA256哈希值,与源文件的哈希值进行比对。如果哈希值不一致,说明传输过程中数据发生了损坏,需要重新传输。对于超大文件,可以分块计算哈希,或者使用CRC32等更轻量的校验算法。paramiko的SFTPClient在传输时内置了数据包校验,但应用层的完整性校验仍然是我们应该自己实现的额外安全层。

import paramiko import hashlib def transfer_with_verify(sftp, local_path, remote_path): """传输后自动校验文件完整性""" # 上传前计算本地MD5 local_md5 = hashlib.md5() with open(local_path, 'rb') as f: for chunk in iter(lambda: f.read(8192), b''): local_md5.update(chunk) local_hash = local_md5.hexdigest() print(f"本地文件 MD5: {local_hash}") # 上传文件 sftp.put(local_path, remote_path) print("上传完成,开始校验...") # 下载远程文件计算MD5 remote_md5 = hashlib.md5() with sftp.open(remote_path, 'rb') as f: for chunk in iter(lambda: f.read(8192), b''): remote_md5.update(chunk) remote_hash = remote_md5.hexdigest() print(f"远程文件 MD5: {remote_hash}") # 比对 if local_hash == remote_hash: print("校验通过:文件完整") return True else: print("校验失败:文件损坏") return False # 带重试和校验的完整传输函数 def reliable_transfer(sftp, local_path, remote_path, max_retries=3): for i in range(max_retries): try: if transfer_with_verify(sftp, local_path, remote_path): return True except Exception as e: print(f"第{i+1}次传输异常: {e}") time.sleep(2 ** i) # 指数退避 print("传输失败,已达最大重试次数") return False

九、实战案例

理论学习最终要服务于实际应用。本节通过三个完整的实战案例,展示如何将前面所学的技术整合起来,解决真实的业务需求。每个案例都包含了完整的代码示例和使用说明,你可以直接参考或根据实际环境进行调整。

案例一:定时备份到远程服务器

数据库备份是企业IT运维的常规需求。下面的脚本实现了定期将本地数据库导出文件备份到远程SFTP服务器,并自动清理超过30天的旧备份。脚本结合了mysqldump命令执行、文件压缩、SFTP上传和定时调度等功能,是一个完整的数据备份解决方案。

import paramiko import os import time import subprocess import logging from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename='backup.log' ) logger = logging.getLogger(__name__) DB_CONFIG = { 'host': 'localhost', 'user': 'db_user', 'password': 'db_pass', 'name': 'my_database' } SFTP_CONFIG = { 'host': 'backup.example.com', 'port': 22, 'user': 'backup_user', 'pass': 'backup_pass', 'remote_dir': '/backups/database/' } def create_backup(): """创建数据库备份""" timestamp = time.strftime('%Y%m%d_%H%M%S') local_file = f'db_backup_{timestamp}.sql.gz' # 导出并压缩 cmd = (f'mysqldump -h {DB_CONFIG["host"]} -u {DB_CONFIG["user"]} ' f'-p{DB_CONFIG["password"]} {DB_CONFIG["name"]} | gzip > {local_file}') result = subprocess.run(cmd, shell=True, capture_output=True) if result.returncode != 0: logger.error(f"备份失败: {result.stderr.decode()}") return None file_size = os.path.getsize(local_file) logger.info(f"备份文件创建成功: {local_file} ({file_size/1024/1024:.2f} MB)") return local_file def upload_to_remote(local_file): """上传到远程服务器""" try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(SFTP_CONFIG['host'], port=SFTP_CONFIG['port'], username=SFTP_CONFIG['user'], password=SFTP_CONFIG['pass']) sftp = ssh.open_sftp() # 确保远程目录存在 try: sftp.stat(SFTP_CONFIG['remote_dir']) except FileNotFoundError: sftp.mkdir(SFTP_CONFIG['remote_dir']) remote_path = SFTP_CONFIG['remote_dir'] + os.path.basename(local_file) sftp.put(local_file, remote_path) logger.info(f"上传成功: {remote_path}") sftp.close() ssh.close() return True except Exception as e: logger.error(f"上传失败: {e}") return False def clean_old_backups(): """清理远程服务器上超过30天的备份""" ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(SFTP_CONFIG['host'], port=SFTP_CONFIG['port'], username=SFTP_CONFIG['user'], password=SFTP_CONFIG['pass']) sftp = ssh.open_sftp() cutoff = time.time() - 30 * 86400 for attr in sftp.listdir_attr(SFTP_CONFIG['remote_dir']): if attr.st_mtime < cutoff: file_path = SFTP_CONFIG['remote_dir'] + attr.filename sftp.remove(file_path) logger.info(f"清理旧备份: {attr.filename}") sftp.close() ssh.close() def backup_job(): """备份任务主流程""" logger.info("开始执行备份任务...") local_file = create_backup() if local_file and upload_to_remote(local_file): clean_old_backups() os.remove(local_file) # 删除本地临时文件 logger.info("备份任务结束") scheduler = BlockingScheduler() scheduler.add_job(backup_job, CronTrigger(hour=2, minute=0)) scheduler.start()

案例二:多服务器文件分发

在微服务架构或集群部署中,经常需要将配置文件、版本包或静态资源分发到多台服务器。手动逐台操作效率低且容易出错,自动化分发脚本可以大大提高效率和准确性。下面案例实现了多服务器并发分发、传输校验和结果报告功能。

import paramiko import os import hashlib from concurrent.futures import ThreadPoolExecutor, as_completed SERVERS = [ {'host': 'web1.example.com', 'user': 'admin', 'pass': 'pass1'}, {'host': 'web2.example.com', 'user': 'admin', 'pass': 'pass2'}, {'host': 'web3.example.com', 'user': 'admin', 'pass': 'pass3'}, {'host': 'app1.example.com', 'user': 'admin', 'pass': 'pass4'}, ] def distribute_file(local_path, remote_path, server): """向单台服务器分发文件""" result = {'server': server['host'], 'success': False, 'error': ''} try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(server['host'], username=server['user'], password=server['pass']) sftp = ssh.open_sftp() # 确保远程目录存在 remote_dir = os.path.dirname(remote_path) try: sftp.stat(remote_dir) except FileNotFoundError: parts = remote_dir.strip('/').split('/') path = '' for part in parts: path += '/' + part try: sftp.stat(path) except FileNotFoundError: sftp.mkdir(path) # 上传 sftp.put(local_path, remote_path) # 校验 local_md5 = hashlib.md5(open(local_path, 'rb').read()).hexdigest() remote_md5 = hashlib.md5() with sftp.open(remote_path, 'rb') as f: for chunk in iter(lambda: f.read(8192), b''): remote_md5.update(chunk) if remote_md5.hexdigest() == local_md5: result['success'] = True print(f"[OK] {server['host']}: 分发并校验成功") else: result['error'] = 'MD5校验不匹配' sftp.close() ssh.close() except Exception as e: result['error'] = str(e) print(f"[FAIL] {server['host']}: {e}") return result def batch_distribute(local_path, remote_path): """多服务器并发分发""" print(f"开始分发文件: {local_path} -> 共 {len(SERVERS)} 台服务器") results = [] with ThreadPoolExecutor(max_workers=5) as executor: futures = { executor.submit(distribute_file, local_path, remote_path, svr): svr for svr in SERVERS } for future in as_completed(futures): results.append(future.result()) # 报告 success = sum(1 for r in results if r['success']) print(f"\n分发完成: 成功 {success}/{len(SERVERS)}") for r in results: status = "OK" if r['success'] else f"FAIL({r['error']})" print(f" {r['server']}: {status}") # 使用 batch_distribute('./config/app_v2.0.properties', '/opt/app/config.properties')

案例三:数据采集文件自动下载

在数据分析工作中,经常需要从远程服务器上的数据采集系统定时获取数据文件。下面的脚本实现了从多个数据源服务器自动下载CSV/JSON格式的数据文件,合并后导入本地数据库,并记录每次采集的元数据。

import paramiko import os import pandas as pd from datetime import datetime DATA_SOURCES = [ { 'host': 'sensor1.example.com', 'remote_dir': '/data/readings/', 'file_pattern': 'sensor_*.csv', 'local_dir': './collected/sensor1/' }, { 'host': 'sensor2.example.com', 'remote_dir': '/data/metrics/', 'file_pattern': 'metrics_*.json', 'local_dir': './collected/sensor2/' } ] def download_data_files(source): """从数据源下载文件""" host = source['host'] remote_dir = source['remote_dir'] local_dir = source['local_dir'] os.makedirs(local_dir, exist_ok=True) downloaded = [] try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host, username='collector', password='collect_pass') sftp = ssh.open_sftp() # 获取远程文件列表 for attr in sftp.listdir_attr(remote_dir): remote_file = remote_dir + attr.filename local_file = os.path.join(local_dir, attr.filename) # 跳过已下载的文件 if os.path.exists(local_file): local_size = os.path.getsize(local_file) if local_size == attr.st_size: continue # 下载 sftp.get(remote_file, local_file) downloaded.append({ 'file': attr.filename, 'size': attr.st_size, 'mtime': datetime.fromtimestamp(attr.st_mtime) }) print(f"已下载: {attr.filename} ({attr.st_size/1024:.1f} KB)") sftp.close() ssh.close() except Exception as e: print(f"连接失败 {host}: {e}") return downloaded def merge_and_import(): """合并CSV文件并生成汇总报告""" all_data = [] for source in DATA_SOURCES: local_dir = source['local_dir'] if not os.path.exists(local_dir): continue for f in os.listdir(local_dir): if f.endswith('.csv'): df = pd.read_csv(os.path.join(local_dir, f)) df['source'] = source['host'] all_data.append(df) if all_data: merged = pd.concat(all_data, ignore_index=True) merged.to_csv('./merged_data/daily_merged.csv', index=False) print(f"合并完成: {len(merged)} 条记录") # 主流程 all_files = [] for source in DATA_SOURCES: files = download_data_files(source) all_files.extend(files) print(f"本次采集: 共 {len(all_files)} 个文件") merge_and_import()

总结:文件传输自动化是Python办公自动化的重要分支。从基础的FTP/SFTP操作,到增量同步、定时调度、安全校验,再到完整的备份分发解决方案,每个环节都有丰富的应用场景。建议从简单的单文件传输开始实践,逐步构建完整的企业级文件传输自动化系统。