Appearance
后台任务与定时
有些操作不需要在返回响应前做完。比如用户注册后发欢迎邮件——发邮件要几秒,让用户等这几秒才看到"注册成功"不划算。后台任务就是"先返回响应,再慢慢做这些事"。定时任务则是"每隔一段时间自动执行",比如每天凌晨清理过期数据。
一、BackgroundTasks——响应后执行
FastAPI 内置 BackgroundTasks,用法是在函数参数里声明,返回响应后框架自动执行:
python
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def send_welcome_email(email: str):
"""模拟发邮件——实际会调 SMTP 或邮件 API。"""
print(f"发送欢迎邮件到 {email}")
# 实际代码:smtp.sendmail(...) 或 requests.post(mail_api, ...)
@app.post("/register")
def register(email: str, background_tasks: BackgroundTasks):
# 1. 先创建用户、返回响应
user_id = 42
# 2. 把发邮件丢到后台,不阻塞响应
background_tasks.add_task(send_welcome_email, email)
return {"user_id": user_id, "message": "注册成功"}background_tasks.add_task(函数, 参数) 注册一个后台任务。FastAPI 先把响应发回给客户端,然后在同一个进程里执行 send_welcome_email。用户看到"注册成功"时,邮件可能还在发送中——但用户不需要等。
适合什么场景
- 发通知(邮件、消息、Webhook)
- 写日志、记录审计
- 清理临时文件
- 更新缓存
不适合什么场景
BackgroundTasks 在同一个进程里同步执行,重启进程时没执行完的任务会丢。任务量大、执行时间长、不能丢的场景,要用真正的任务队列(Celery、Redis Queue),把任务存到 Redis/RabbitMQ 里,专门的 worker 进程消费。
二、任务状态——查询执行结果
后台任务执行需要时间,前端可能想知道"做到哪了"。一种做法是创建任务时返回一个任务 ID,前端轮询任务状态:
python
import uuid
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
# 内存里的任务状态(生产环境用数据库或 Redis)
tasks_status: dict[str, dict] = {}
def process_report(task_id: str, params: dict):
"""生成报表——耗时操作。"""
tasks_status[task_id]["status"] = "running"
try:
# 实际的报表生成逻辑...
result = {"rows": 100, "file": "report_2024.xlsx"}
tasks_status[task_id]["status"] = "done"
tasks_status[task_id]["result"] = result
except Exception as exc:
tasks_status[task_id]["status"] = "failed"
tasks_status[task_id]["error"] = str(exc)
@app.post("/reports")
def create_report(background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
tasks_status[task_id] = {"status": "pending"}
background_tasks.add_task(process_report, task_id, {"type": "monthly"})
return {"task_id": task_id, "status": "pending"}
@app.get("/tasks/{task_id}")
def get_task_status(task_id: str):
task = tasks_status.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
return task前端流程:
- POST
/reports创建报表任务,拿到task_id - 每隔几秒 GET
/tasks/{task_id}看状态 - 状态变成
done时,从result里拿报表地址
这种"提交任务 → 轮询状态"的模式,在运维平台的项目实战里会用到——批量巡检、报表生成这类耗时操作都是这个套路。
三、定时任务
每隔一段时间自动执行的任务,比如每天清理过期 Token、每小时统计访问量。Python 里用 apscheduler:
bash
uv add apschedulerpython
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = BackgroundScheduler()
def cleanup_expired_tokens():
"""清理过期的 Token 记录。"""
print("清理过期 Token...")
# db.query(Token).filter(Token.expire_at < datetime.now()).delete()
# db.commit()
# 每天凌晨 2 点执行
scheduler.add_job(
cleanup_expired_tokens,
CronTrigger(hour=2, minute=0),
id="cleanup_tokens",
)
# 每 5 分钟执行
scheduler.add_job(
lambda: print("每 5 分钟检查一次"),
"interval",
minutes=5,
id="health_check",
)
scheduler.start()CronTrigger(hour=2, minute=0) 跟 cron 表达式一样——每天 2:00 执行。"interval", minutes=5 是固定间隔——每 5 分钟一次。
在 FastAPI 里管理调度器生命周期
python
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 应用启动时
scheduler.start()
yield
# 应用关闭时
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)lifespan 管理应用的生命周期——启动时开调度器,关闭时停。不 shutdown 直接退进程,正在执行的任务会被中断。
定时任务 vs cron
系统 cron 也能做定时——写个脚本,crontab -e 加一条。apscheduler 的区别是:任务定义在应用代码里,跟应用共享数据库连接和配置,不用额外管理。但应用重启时调度器也重启,不跨进程持久化。任务量大或可靠性要求高时,仍然推荐系统 cron 或 Celery beat。