Appearance
任务调度与 Webhook
资产的增删改查做完了。运维平台还有个核心功能——对资产执行操作(检查端口、重启服务),这就是任务管理。任务通常耗时(SSH 到目标机器、等命令返回),要用后台执行 + 状态轮询的模式(16 篇讲过)。任务完成后还可以通过 Webhook 通知外部系统。
一、任务接口
1 创建任务
python
# backend/app/routers/tasks.py
from fastapi import APIRouter, Depends, BackgroundTasks
from sqlalchemy.orm import Session
router = APIRouter()
@router.post("", response_model=TaskResponse)
def create_task(
task: TaskCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
user = Depends(get_operator_user), # 需要操作权限
):
# 1. 先查资产是否存在
asset = db.query(Asset).filter(Asset.id == task.asset_id).first()
if not asset:
raise HTTPException(status_code=404, detail="资产不存在")
# 2. 创建任务记录,状态 pending
new_task = Task(
asset_id=task.asset_id,
user_id=user.id,
action=task.action,
status="pending",
)
db.add(new_task)
db.commit()
db.refresh(new_task)
# 3. 把实际执行丢到后台
background_tasks.add_task(execute_task, new_task.id)
return new_task接口立刻返回任务 ID,不等任务执行完——任务通过 BackgroundTasks 在后台跑(16 篇)。前端拿到任务 ID 后轮询状态。
2 查询任务状态
python
@router.get("/{task_id}", response_model=TaskResponse)
def get_task(task_id: int, db: Session = Depends(get_db), user=Depends(get_current_user)):
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
return task
@router.get("", response_model=list[TaskResponse])
def list_tasks(
status: str | None = None,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
query = db.query(Task)
if status:
query = query.filter(Task.status == status)
return query.order_by(Task.id.desc()).limit(50).all()前端轮询 GET /tasks/{id} 看状态从 pending → running → done/failed。
二、任务执行
execute_task 在后台执行实际操作——这里以"检查端口"为例:
python
# backend/app/routers/tasks.py
import socket
from datetime import datetime
def execute_task(task_id: int):
"""后台执行任务。独立函数,不依赖请求上下文。"""
db = SessionLocal() # 后台任务自己创建数据库会话
try:
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
return
asset = db.query(Asset).filter(Asset.id == task.asset_id).first()
# 更新状态为 running
task.status = "running"
db.commit()
# 执行实际操作
if task.action == "check_port":
result = check_tcp_port(asset.ip, asset.port)
task.result = result
task.status = "done" if "open" in result else "failed"
else:
task.result = f"未知操作: {task.action}"
task.status = "failed"
task.completed_at = datetime.now()
db.commit()
# 任务完成后发 Webhook 通知
send_webhook(task)
except Exception as exc:
task.status = "failed"
task.result = str(exc)
db.commit()
finally:
db.close()
def check_tcp_port(host: str, port: int, timeout: int = 3) -> str:
"""检查 TCP 端口是否可达。"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
result = sock.connect_ex((host, port))
return f"port {port} open" if result == 0 else f"port {port} closed (errno={result})"
finally:
sock.close()后台任务要自己创建数据库会话(SessionLocal())——不能复用请求的会话,因为请求已经结束了,会话已关闭。任务执行完在 finally 里关闭自己的会话。
三、前端任务页面
前端提交任务后轮询状态:
vue
<!-- src/views/TaskListView.vue -->
<script setup>
import { ref, onMounted, onUnmounted } from "vue";
import { request } from "../api/client";
const tasks = ref([]);
let timer = null;
async function loadTasks() {
tasks.value = await request("/tasks");
}
function createTask(assetId, action) {
request("/tasks", {
method: "POST",
body: JSON.stringify({ asset_id: assetId, action }),
}).then(() => loadTasks());
}
onMounted(() => {
loadTasks();
// 每 5 秒刷新一次,看任务状态变化
timer = setInterval(loadTasks, 5000);
});
onUnmounted(() => {
clearInterval(timer); // 离开页面时停掉轮询
});
</script>
<template>
<table>
<thead>
<tr><th>ID</th><th>资产</th><th>操作</th><th>状态</th><th>结果</th></tr>
</thead>
<tbody>
<tr v-for="task in tasks" :key="task.id">
<td>{{ task.id }}</td>
<td>{{ task.asset_id }}</td>
<td>{{ task.action }}</td>
<td>
<span :class="task.status">{{ task.status }}</span>
</td>
<td>{{ task.result }}</td>
</tr>
</tbody>
</table>
</template>setInterval(loadTasks, 5000) 每 5 秒请求一次任务列表,页面自动刷新显示最新状态。onUnmounted 里清掉定时器——不然离开页面后定时器还在跑,白白发请求。
四、Webhook 通知
任务完成后,可能要通知外部系统——钉钉机器人、企业微信、另一个 API。Webhook 就是"事件发生时主动 POST 一个通知到指定 URL"。
1 配置 Webhook URL
python
# backend/app/config.py
class Settings(BaseSettings):
webhook_url: str | None = None # 不配就不发通知
webhook_token: str | None = None # 可选的认证 Token2 发送 Webhook
python
# backend/app/routers/tasks.py
import httpx # 异步 HTTP 客户端,比 requests 更适合 FastAPI
async def send_webhook(task: Task):
"""任务完成后发送 Webhook 通知。"""
if not settings.webhook_url:
return # 没配 Webhook URL 就跳过
payload = {
"event": "task_completed",
"task_id": task.id,
"action": task.action,
"status": task.status,
"result": task.result,
"completed_at": task.completed_at.isoformat() if task.completed_at else None,
}
headers = {}
if settings.webhook_token:
headers["Authorization"] = f"Bearer {settings.webhook_token}"
try:
async with httpx.AsyncClient() as client:
response = await client.post(
settings.webhook_url,
json=payload,
headers=headers,
timeout=10,
)
logger.info("webhook sent status=%s task_id=%s", response.status_code, task.id)
except Exception as exc:
# Webhook 发送失败不影响任务本身——记录日志就行
logger.warning("webhook failed task_id=%s error=%s", task.id, exc)Webhook 发送失败不影响任务本身——任务已经执行完了,通知发不出去只是外部系统不知道。用 try/except 兜住,失败记日志,不抛异常。
execute_task 里调用时要改成异步(因为 send_webhook 是 async):
python
import asyncio
# execute_task 函数末尾
asyncio.run(send_webhook(task))3 接收端示例
钉钉机器人的 Webhook URL 配到 settings.webhook_url,任务完成时钉钉群里就会收到一条通知。或者对接自己的 API 做后续处理——这就是"事件驱动"的基本模式:A 完成后主动通知 B,而不是 B 轮询 A。