Skip to content

任务调度与 Webhook

资产的增删改查做完了。运维平台还有个核心功能——对资产执行操作(检查端口、重启服务),这就是任务管理。任务通常耗时(SSH 到目标机器、等命令返回),要用后台执行 + 状态轮询的模式(16 篇讲过)。任务完成后还可以通过 Webhook 通知外部系统。

一、任务接口

1 创建任务

python
# backend/app/routers/tasks.py
from fastapi import APIRouter, Depends, BackgroundTasks
from sqlalchemy.orm import Session

router = APIRouter()


@router.post("", response_model=TaskResponse)
def create_task(
    task: TaskCreate,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    user = Depends(get_operator_user),  # 需要操作权限
):
    # 1. 先查资产是否存在
    asset = db.query(Asset).filter(Asset.id == task.asset_id).first()
    if not asset:
        raise HTTPException(status_code=404, detail="资产不存在")

    # 2. 创建任务记录,状态 pending
    new_task = Task(
        asset_id=task.asset_id,
        user_id=user.id,
        action=task.action,
        status="pending",
    )
    db.add(new_task)
    db.commit()
    db.refresh(new_task)

    # 3. 把实际执行丢到后台
    background_tasks.add_task(execute_task, new_task.id)

    return new_task

接口立刻返回任务 ID,不等任务执行完——任务通过 BackgroundTasks 在后台跑(16 篇)。前端拿到任务 ID 后轮询状态。

2 查询任务状态

python
@router.get("/{task_id}", response_model=TaskResponse)
def get_task(task_id: int, db: Session = Depends(get_db), user=Depends(get_current_user)):
    task = db.query(Task).filter(Task.id == task_id).first()
    if not task:
        raise HTTPException(status_code=404, detail="任务不存在")
    return task


@router.get("", response_model=list[TaskResponse])
def list_tasks(
    status: str | None = None,
    db: Session = Depends(get_db),
    user=Depends(get_current_user),
):
    query = db.query(Task)
    if status:
        query = query.filter(Task.status == status)
    return query.order_by(Task.id.desc()).limit(50).all()

前端轮询 GET /tasks/{id} 看状态从 pendingrunningdone/failed

二、任务执行

execute_task 在后台执行实际操作——这里以"检查端口"为例:

python
# backend/app/routers/tasks.py
import socket
from datetime import datetime


def execute_task(task_id: int):
    """后台执行任务。独立函数,不依赖请求上下文。"""
    db = SessionLocal()  # 后台任务自己创建数据库会话

    try:
        task = db.query(Task).filter(Task.id == task_id).first()
        if not task:
            return

        asset = db.query(Asset).filter(Asset.id == task.asset_id).first()

        # 更新状态为 running
        task.status = "running"
        db.commit()

        # 执行实际操作
        if task.action == "check_port":
            result = check_tcp_port(asset.ip, asset.port)
            task.result = result
            task.status = "done" if "open" in result else "failed"
        else:
            task.result = f"未知操作: {task.action}"
            task.status = "failed"

        task.completed_at = datetime.now()
        db.commit()

        # 任务完成后发 Webhook 通知
        send_webhook(task)

    except Exception as exc:
        task.status = "failed"
        task.result = str(exc)
        db.commit()
    finally:
        db.close()


def check_tcp_port(host: str, port: int, timeout: int = 3) -> str:
    """检查 TCP 端口是否可达。"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    try:
        result = sock.connect_ex((host, port))
        return f"port {port} open" if result == 0 else f"port {port} closed (errno={result})"
    finally:
        sock.close()

后台任务要自己创建数据库会话SessionLocal())——不能复用请求的会话,因为请求已经结束了,会话已关闭。任务执行完在 finally 里关闭自己的会话。

三、前端任务页面

前端提交任务后轮询状态:

vue
<!-- src/views/TaskListView.vue -->
<script setup>
import { ref, onMounted, onUnmounted } from "vue";
import { request } from "../api/client";

const tasks = ref([]);
let timer = null;

async function loadTasks() {
  tasks.value = await request("/tasks");
}

function createTask(assetId, action) {
  request("/tasks", {
    method: "POST",
    body: JSON.stringify({ asset_id: assetId, action }),
  }).then(() => loadTasks());
}

onMounted(() => {
  loadTasks();
  // 每 5 秒刷新一次,看任务状态变化
  timer = setInterval(loadTasks, 5000);
});

onUnmounted(() => {
  clearInterval(timer);  // 离开页面时停掉轮询
});
</script>

<template>
  <table>
    <thead>
      <tr><th>ID</th><th>资产</th><th>操作</th><th>状态</th><th>结果</th></tr>
    </thead>
    <tbody>
      <tr v-for="task in tasks" :key="task.id">
        <td>{{ task.id }}</td>
        <td>{{ task.asset_id }}</td>
        <td>{{ task.action }}</td>
        <td>
          <span :class="task.status">{{ task.status }}</span>
        </td>
        <td>{{ task.result }}</td>
      </tr>
    </tbody>
  </table>
</template>

setInterval(loadTasks, 5000) 每 5 秒请求一次任务列表,页面自动刷新显示最新状态。onUnmounted 里清掉定时器——不然离开页面后定时器还在跑,白白发请求。

四、Webhook 通知

任务完成后,可能要通知外部系统——钉钉机器人、企业微信、另一个 API。Webhook 就是"事件发生时主动 POST 一个通知到指定 URL"。

1 配置 Webhook URL

python
# backend/app/config.py
class Settings(BaseSettings):
    webhook_url: str | None = None  # 不配就不发通知
    webhook_token: str | None = None  # 可选的认证 Token

2 发送 Webhook

python
# backend/app/routers/tasks.py
import httpx  # 异步 HTTP 客户端,比 requests 更适合 FastAPI


async def send_webhook(task: Task):
    """任务完成后发送 Webhook 通知。"""
    if not settings.webhook_url:
        return  # 没配 Webhook URL 就跳过

    payload = {
        "event": "task_completed",
        "task_id": task.id,
        "action": task.action,
        "status": task.status,
        "result": task.result,
        "completed_at": task.completed_at.isoformat() if task.completed_at else None,
    }

    headers = {}
    if settings.webhook_token:
        headers["Authorization"] = f"Bearer {settings.webhook_token}"

    try:
        async with httpx.AsyncClient() as client:
            response = await client.post(
                settings.webhook_url,
                json=payload,
                headers=headers,
                timeout=10,
            )
            logger.info("webhook sent status=%s task_id=%s", response.status_code, task.id)
    except Exception as exc:
        # Webhook 发送失败不影响任务本身——记录日志就行
        logger.warning("webhook failed task_id=%s error=%s", task.id, exc)

Webhook 发送失败不影响任务本身——任务已经执行完了,通知发不出去只是外部系统不知道。用 try/except 兜住,失败记日志,不抛异常。

execute_task 里调用时要改成异步(因为 send_webhookasync):

python
import asyncio

# execute_task 函数末尾
asyncio.run(send_webhook(task))

3 接收端示例

钉钉机器人的 Webhook URL 配到 settings.webhook_url,任务完成时钉钉群里就会收到一条通知。或者对接自己的 API 做后续处理——这就是"事件驱动"的基本模式:A 完成后主动通知 B,而不是 B 轮询 A