Appearance
网络请求与并发
脚本跟外部系统打交道时,大部分时间都在等——等 HTTP 响应、等 SSH 命令执行完、等数据库返回结果。目标一多,串行等下来就很慢。并发的作用是把这些等待重叠起来,整批任务的耗时从"所有目标加起来"变成"最慢的那个"。
并发不是简单地把任务同时丢出去。并发数上限、单个任务超时、失败结果收集、重试间隔这些都要一起处理——没有限制的批量脚本,可能反过来把要检查的服务压垮。
一、HTTP 请求
requests 是最常用的 HTTP 客户端:
bash
uv add requestspython
import requests
response = requests.get("https://example.com/health", timeout=5)
print(response.status_code)
print(response.text)timeout=5 不能省。没超时的请求可能一直卡住,定时任务会越积越多,最后把进程数或连接数耗光。
POST JSON 和带 Token 认证:
python
import os
import requests
# POST JSON
response = requests.post(
"https://example.com/api/report",
json={"hostname": "web01", "status": "ok"},
timeout=5,
)
# Token 认证
headers = {"Authorization": f"Bearer {os.environ['API_TOKEN']}"}
response = requests.get("https://example.com/api/hosts", headers=headers, timeout=5)
print(response.json())response.json() 把 JSON 响应解析成字典/列表;服务端返回的不是 JSON 会抛异常。
封装一个带错误处理的检查函数,批量脚本里反复用:
python
import requests
def check_url(target, timeout=5):
"""检查一个 URL,返回统一结构的结果。"""
try:
response = requests.get(target["url"], timeout=timeout)
ok = response.status_code == 200
message = f"status_code={response.status_code}"
except requests.RequestException as exc:
ok = False
message = str(exc)
return {"name": target["name"], "url": target["url"], "ok": ok, "message": message}批量任务里,结果结构统一很重要——不管动作是 HTTP、SSH 还是数据库查询,最后都整理成 {"target": ..., "ok": ..., "message": ...} 这种格式,后面生成报告、写 CSV、发通知都好处理。
二、SSH 批量执行
paramiko 通过 SSH 执行远端命令:
bash
uv add paramikopython
import paramiko
def run_ssh(host, username, password, command, timeout=10):
"""SSH 到目标机器执行命令,返回 (退出码, stdout, stderr)。"""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=host, username=username, password=password, timeout=5)
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
return exit_code, stdout.read().decode(), stderr.read().decode()
finally:
client.close()AutoAddPolicy() 跳过主机指纹校验。受控测试环境(虚拟机、隔离网段)里影响有限;生产环境里应该维护 known_hosts,拒绝指纹不匹配的连接——不然 DNS 被篡改或 IP 被冒用时,可能连到一台中间人机器上。
批量执行时,一台失败记录原因,继续处理其他机器,最后汇总:
python
hosts = ["192.168.10.11", "192.168.10.12", "192.168.10.13"]
results = []
for host in hosts:
try:
code, out, err = run_ssh(host, "root", "password", "uptime")
results.append({"host": host, "ok": code == 0, "stdout": out.strip()})
except Exception as exc:
results.append({"host": host, "ok": False, "error": str(exc)})
for r in results:
print(r)整批任务一台失败就中断,反而会丢掉剩余目标的状态——批量脚本的核心是"全部跑完,结果各自记录"。
三、数据库查询
运维脚本经常连数据库查状态。pymysql 连 MySQL:
bash
uv add pymysqlpython
import pymysql
conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="password",
database="mysql",
connect_timeout=5,
read_timeout=10,
cursorclass=pymysql.cursors.DictCursor, # 结果转字典
)
try:
with conn.cursor() as cursor:
cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_connected'")
row = cursor.fetchone()
print(row["Variable_name"], row["Value"])
finally:
conn.close()运维脚本执行 SQL 要区分只读查询和变更操作。只读巡检限制超时避免慢查询影响业务;批量变更要有清晰的输入、日志和回滚方式。
四、线程池
目标一多,串行等网络就很慢。concurrent.futures.ThreadPoolExecutor 是标准库线程池,适合 HTTP、SSH、数据库这类等待型任务:
python
from concurrent.futures import ThreadPoolExecutor, as_completed
def check_host(host):
# 模拟网络等待,真实脚本里是 HTTP/SSH/数据库请求
import time
time.sleep(1)
return {"host": host, "ok": True}
hosts = ["web01", "web02", "db01", "cache01"]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(check_host, h): h for h in hosts}
for future in as_completed(futures):
host = futures[future]
try:
result = future.result()
print(result)
except Exception as exc:
# 单个失败也写进结果,整批继续
print({"host": host, "ok": False, "error": str(exc)})max_workers 控制同时跑几个任务。线程里的异常不会自动冒出来,要调 future.result() 才会重新抛——所以每个 future 都要 try/except,不然某个目标失败会被静默吞掉。
as_completed() 谁先完成就先返回,输出顺序跟输入顺序可能不同。报告需要保持顺序时,结果里带序号,最后排序。
五、超时和重试
超时分两层:底层网络库超时(requests 的 timeout、SSH 的 timeout)和线程池等待超时(future.result(timeout=...))。
python
# 线程池等待超时
try:
result = future.result(timeout=10)
except TimeoutError:
result = {"ok": False, "message": "task timeout"}future.result(timeout=10) 只控制主线程等多久,不能强行杀掉已经卡住的线程。底层 HTTP、SSH 还是要设自己的超时,不然线程继续卡在系统调用里,池子被占满。
重试适合短暂网络波动,不适合掩盖长期故障:
python
import time
import requests
def get_with_retry(url, retries=3, timeout=5):
for attempt in range(1, retries + 1):
try:
response = requests.get(url, timeout=timeout)
if response.status_code == 200:
return response
except requests.RequestException:
pass
time.sleep(attempt) # 退避:第 1 次等 1 秒,第 2 次等 2 秒
raise RuntimeError(f"failed after {retries} retries: {url}")重试会放大请求量——100 个目标每个重试 2 次,最坏变成 300 次请求。重试次数和并发数要一起算,别把要检查的服务压垮了。401、403、400 这类确定性错误不该重试,重试也没用。
六、asyncio 简介
前面用的线程池,是操作系统帮你切换线程。asyncio 是另一条路——单线程里用 async/await 语法手动标记"这里可以切换去干别的",适合大量网络等待的场景。
先理解两个关键词:
async def定义一个协程函数,调用它不会立刻执行,而是返回一个协程对象await表示"在这里等结果,等的时候可以去跑别的协程"
python
import asyncio
import aiohttp
async def check(session, url):
async with session.get(url) as response:
return url, response.status
async def main():
urls = ["https://example.com/health", "http://127.0.0.1:3000/api/health"]
async with aiohttp.ClientSession() as session:
# asyncio.gather 同时发起多个协程,等它们全部完成
results = await asyncio.gather(*[check(session, url) for url in urls])
print(results)
# asyncio.run 启动事件循环,跑 main 协程
asyncio.run(main())asyncio.gather() 同时启动多个协程并发等待。asyncio.run() 是入口,启动事件循环(负责在协程之间切换的那个调度器)。
异步代码里不能用普通的阻塞函数(比如 requests.get、time.sleep)——它们会卡住整个事件循环,其他协程也没机会跑。必须用支持 async 的库(aiohttp 替代 requests、asyncio.sleep 替代 time.sleep)。
运维脚本里线程池够用就别上 asyncio。asyncio 写法跟同步代码差异大,排查也麻烦,目标数量到几百上千、且对延迟敏感时再考虑。这一节先混个眼熟,遇到再深挖。
七、多进程
CPU 密集型工作(压缩大量文件、统计大日志)用多进程,绕开 Python 的 GIL:
python
from concurrent.futures import ProcessPoolExecutor
def count_lines(path):
count = 0
with open(path, encoding="utf-8", errors="ignore") as f:
for _ in f:
count += 1
return path, count
paths = ["/var/log/messages", "/var/log/secure"]
with ProcessPoolExecutor(max_workers=2) as executor:
for path, count in executor.map(count_lines, paths):
print(path, count)多进程入口要放在 if __name__ == "__main__": 下面,不然 Windows 上子进程导入模块时会再次创建进程,无限递归。运维脚本里多进程用得少——大部分运维任务是等待型(IO 密集),线程池更合适。
八、批量 HTTP 健康检查
把参数、并发、错误处理、结果汇总合到一个完整脚本:
新增文件:scripts/batch_health_check.py
python
#!/usr/bin/env python3
"""批量检查 HTTP 健康接口,支持并发。"""
import argparse
import json
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import requests
def parse_args():
parser = argparse.ArgumentParser(description="batch http health check")
parser.add_argument("--targets", required=True, help="targets json file")
parser.add_argument("--workers", type=int, default=5, help="concurrent workers")
parser.add_argument("--timeout", type=int, default=5, help="request timeout")
return parser.parse_args()
def load_targets(path):
return json.loads(Path(path).read_text(encoding="utf-8"))
def check_one(target, timeout):
started = time.time()
try:
resp = requests.get(target["url"], timeout=timeout)
ok = resp.status_code == 200
message = f"status_code={resp.status_code}"
except requests.RequestException as exc:
ok = False
message = str(exc)
return {
"name": target["name"],
"url": target["url"],
"ok": ok,
"message": message,
"cost_ms": int((time.time() - started) * 1000),
}
def main():
args = parse_args()
targets = load_targets(args.targets)
results = []
with ThreadPoolExecutor(max_workers=args.workers) as executor:
future_map = {
executor.submit(check_one, t, args.timeout): t for t in targets
}
for future in as_completed(future_map):
target = future_map[future]
try:
results.append(future.result())
except Exception as exc:
results.append({
"name": target["name"], "ok": False, "message": str(exc),
})
print(json.dumps(results, ensure_ascii=False, indent=2))
# 任意目标失败,整体返回非零,cron/CI/监控能识别
return 1 if any(not r["ok"] for r in results) else 0
if __name__ == "__main__":
sys.exit(main())目标文件 targets.json:
json
[
{"name": "api", "url": "https://example.com/health"},
{"name": "grafana", "url": "http://127.0.0.1:3000/api/health"}
]bash
uv run python scripts/batch_health_check.py --targets targets.json --workers 5这个脚本结构是批量运维脚本的通用骨架:读目标清单 → 线程池并发检查 → 统一结果结构 → JSON 输出 → 退出码反映整体成败。换成 SSH 检查、数据库巡检,骨架不变,只换 check_one 函数里的逻辑。