Skip to content

网络请求与并发

脚本跟外部系统打交道时,大部分时间都在等——等 HTTP 响应、等 SSH 命令执行完、等数据库返回结果。目标一多,串行等下来就很慢。并发的作用是把这些等待重叠起来,整批任务的耗时从"所有目标加起来"变成"最慢的那个"。

并发不是简单地把任务同时丢出去。并发数上限、单个任务超时、失败结果收集、重试间隔这些都要一起处理——没有限制的批量脚本,可能反过来把要检查的服务压垮。

一、HTTP 请求

requests 是最常用的 HTTP 客户端:

bash
uv add requests
python
import requests

response = requests.get("https://example.com/health", timeout=5)
print(response.status_code)
print(response.text)

timeout=5 不能省。没超时的请求可能一直卡住,定时任务会越积越多,最后把进程数或连接数耗光。

POST JSON 和带 Token 认证:

python
import os
import requests

# POST JSON
response = requests.post(
    "https://example.com/api/report",
    json={"hostname": "web01", "status": "ok"},
    timeout=5,
)

# Token 认证
headers = {"Authorization": f"Bearer {os.environ['API_TOKEN']}"}
response = requests.get("https://example.com/api/hosts", headers=headers, timeout=5)
print(response.json())

response.json() 把 JSON 响应解析成字典/列表;服务端返回的不是 JSON 会抛异常。

封装一个带错误处理的检查函数,批量脚本里反复用:

python
import requests


def check_url(target, timeout=5):
    """检查一个 URL,返回统一结构的结果。"""
    try:
        response = requests.get(target["url"], timeout=timeout)
        ok = response.status_code == 200
        message = f"status_code={response.status_code}"
    except requests.RequestException as exc:
        ok = False
        message = str(exc)

    return {"name": target["name"], "url": target["url"], "ok": ok, "message": message}

批量任务里,结果结构统一很重要——不管动作是 HTTP、SSH 还是数据库查询,最后都整理成 {"target": ..., "ok": ..., "message": ...} 这种格式,后面生成报告、写 CSV、发通知都好处理。

二、SSH 批量执行

paramiko 通过 SSH 执行远端命令:

bash
uv add paramiko
python
import paramiko


def run_ssh(host, username, password, command, timeout=10):
    """SSH 到目标机器执行命令,返回 (退出码, stdout, stderr)。"""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    try:
        client.connect(hostname=host, username=username, password=password, timeout=5)
        stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
        exit_code = stdout.channel.recv_exit_status()
        return exit_code, stdout.read().decode(), stderr.read().decode()
    finally:
        client.close()

AutoAddPolicy() 跳过主机指纹校验。受控测试环境(虚拟机、隔离网段)里影响有限;生产环境里应该维护 known_hosts,拒绝指纹不匹配的连接——不然 DNS 被篡改或 IP 被冒用时,可能连到一台中间人机器上。

批量执行时,一台失败记录原因,继续处理其他机器,最后汇总:

python
hosts = ["192.168.10.11", "192.168.10.12", "192.168.10.13"]
results = []

for host in hosts:
    try:
        code, out, err = run_ssh(host, "root", "password", "uptime")
        results.append({"host": host, "ok": code == 0, "stdout": out.strip()})
    except Exception as exc:
        results.append({"host": host, "ok": False, "error": str(exc)})

for r in results:
    print(r)

整批任务一台失败就中断,反而会丢掉剩余目标的状态——批量脚本的核心是"全部跑完,结果各自记录"。

三、数据库查询

运维脚本经常连数据库查状态。pymysql 连 MySQL:

bash
uv add pymysql
python
import pymysql

conn = pymysql.connect(
    host="127.0.0.1",
    port=3306,
    user="root",
    password="password",
    database="mysql",
    connect_timeout=5,
    read_timeout=10,
    cursorclass=pymysql.cursors.DictCursor,  # 结果转字典
)

try:
    with conn.cursor() as cursor:
        cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_connected'")
        row = cursor.fetchone()
        print(row["Variable_name"], row["Value"])
finally:
    conn.close()

运维脚本执行 SQL 要区分只读查询和变更操作。只读巡检限制超时避免慢查询影响业务;批量变更要有清晰的输入、日志和回滚方式。

四、线程池

目标一多,串行等网络就很慢。concurrent.futures.ThreadPoolExecutor 是标准库线程池,适合 HTTP、SSH、数据库这类等待型任务:

python
from concurrent.futures import ThreadPoolExecutor, as_completed


def check_host(host):
    # 模拟网络等待,真实脚本里是 HTTP/SSH/数据库请求
    import time
    time.sleep(1)
    return {"host": host, "ok": True}


hosts = ["web01", "web02", "db01", "cache01"]

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(check_host, h): h for h in hosts}

    for future in as_completed(futures):
        host = futures[future]
        try:
            result = future.result()
            print(result)
        except Exception as exc:
            # 单个失败也写进结果,整批继续
            print({"host": host, "ok": False, "error": str(exc)})

max_workers 控制同时跑几个任务。线程里的异常不会自动冒出来,要调 future.result() 才会重新抛——所以每个 future 都要 try/except,不然某个目标失败会被静默吞掉。

as_completed() 谁先完成就先返回,输出顺序跟输入顺序可能不同。报告需要保持顺序时,结果里带序号,最后排序。

五、超时和重试

超时分两层:底层网络库超时(requests 的 timeout、SSH 的 timeout)和线程池等待超时(future.result(timeout=...))。

python
# 线程池等待超时
try:
    result = future.result(timeout=10)
except TimeoutError:
    result = {"ok": False, "message": "task timeout"}

future.result(timeout=10) 只控制主线程等多久,不能强行杀掉已经卡住的线程。底层 HTTP、SSH 还是要设自己的超时,不然线程继续卡在系统调用里,池子被占满。

重试适合短暂网络波动,不适合掩盖长期故障:

python
import time
import requests


def get_with_retry(url, retries=3, timeout=5):
    for attempt in range(1, retries + 1):
        try:
            response = requests.get(url, timeout=timeout)
            if response.status_code == 200:
                return response
        except requests.RequestException:
            pass
        time.sleep(attempt)  # 退避:第 1 次等 1 秒,第 2 次等 2 秒

    raise RuntimeError(f"failed after {retries} retries: {url}")

重试会放大请求量——100 个目标每个重试 2 次,最坏变成 300 次请求。重试次数和并发数要一起算,别把要检查的服务压垮了。401、403、400 这类确定性错误不该重试,重试也没用。

六、asyncio 简介

前面用的线程池,是操作系统帮你切换线程。asyncio 是另一条路——单线程里用 async/await 语法手动标记"这里可以切换去干别的",适合大量网络等待的场景。

先理解两个关键词:

  • async def 定义一个协程函数,调用它不会立刻执行,而是返回一个协程对象
  • await 表示"在这里等结果,等的时候可以去跑别的协程"
python
import asyncio
import aiohttp


async def check(session, url):
    async with session.get(url) as response:
        return url, response.status


async def main():
    urls = ["https://example.com/health", "http://127.0.0.1:3000/api/health"]
    async with aiohttp.ClientSession() as session:
        # asyncio.gather 同时发起多个协程,等它们全部完成
        results = await asyncio.gather(*[check(session, url) for url in urls])
        print(results)


# asyncio.run 启动事件循环,跑 main 协程
asyncio.run(main())

asyncio.gather() 同时启动多个协程并发等待。asyncio.run() 是入口,启动事件循环(负责在协程之间切换的那个调度器)。

异步代码里不能用普通的阻塞函数(比如 requests.gettime.sleep)——它们会卡住整个事件循环,其他协程也没机会跑。必须用支持 async 的库(aiohttp 替代 requestsasyncio.sleep 替代 time.sleep)。

运维脚本里线程池够用就别上 asyncio。asyncio 写法跟同步代码差异大,排查也麻烦,目标数量到几百上千、且对延迟敏感时再考虑。这一节先混个眼熟,遇到再深挖。

七、多进程

CPU 密集型工作(压缩大量文件、统计大日志)用多进程,绕开 Python 的 GIL:

python
from concurrent.futures import ProcessPoolExecutor


def count_lines(path):
    count = 0
    with open(path, encoding="utf-8", errors="ignore") as f:
        for _ in f:
            count += 1
    return path, count


paths = ["/var/log/messages", "/var/log/secure"]

with ProcessPoolExecutor(max_workers=2) as executor:
    for path, count in executor.map(count_lines, paths):
        print(path, count)

多进程入口要放在 if __name__ == "__main__": 下面,不然 Windows 上子进程导入模块时会再次创建进程,无限递归。运维脚本里多进程用得少——大部分运维任务是等待型(IO 密集),线程池更合适。

八、批量 HTTP 健康检查

把参数、并发、错误处理、结果汇总合到一个完整脚本:

新增文件:scripts/batch_health_check.py

python
#!/usr/bin/env python3
"""批量检查 HTTP 健康接口,支持并发。"""

import argparse
import json
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

import requests


def parse_args():
    parser = argparse.ArgumentParser(description="batch http health check")
    parser.add_argument("--targets", required=True, help="targets json file")
    parser.add_argument("--workers", type=int, default=5, help="concurrent workers")
    parser.add_argument("--timeout", type=int, default=5, help="request timeout")
    return parser.parse_args()


def load_targets(path):
    return json.loads(Path(path).read_text(encoding="utf-8"))


def check_one(target, timeout):
    started = time.time()
    try:
        resp = requests.get(target["url"], timeout=timeout)
        ok = resp.status_code == 200
        message = f"status_code={resp.status_code}"
    except requests.RequestException as exc:
        ok = False
        message = str(exc)

    return {
        "name": target["name"],
        "url": target["url"],
        "ok": ok,
        "message": message,
        "cost_ms": int((time.time() - started) * 1000),
    }


def main():
    args = parse_args()
    targets = load_targets(args.targets)

    results = []
    with ThreadPoolExecutor(max_workers=args.workers) as executor:
        future_map = {
            executor.submit(check_one, t, args.timeout): t for t in targets
        }
        for future in as_completed(future_map):
            target = future_map[future]
            try:
                results.append(future.result())
            except Exception as exc:
                results.append({
                    "name": target["name"], "ok": False, "message": str(exc),
                })

    print(json.dumps(results, ensure_ascii=False, indent=2))

    # 任意目标失败,整体返回非零,cron/CI/监控能识别
    return 1 if any(not r["ok"] for r in results) else 0


if __name__ == "__main__":
    sys.exit(main())

目标文件 targets.json:

json
[
  {"name": "api", "url": "https://example.com/health"},
  {"name": "grafana", "url": "http://127.0.0.1:3000/api/health"}
]
bash
uv run python scripts/batch_health_check.py --targets targets.json --workers 5

这个脚本结构是批量运维脚本的通用骨架:读目标清单 → 线程池并发检查 → 统一结果结构 → JSON 输出 → 退出码反映整体成败。换成 SSH 检查、数据库巡检,骨架不变,只换 check_one 函数里的逻辑。