Appearance
日志解析与证据提取
inputs/sample.log 现在还是普通文本。Agent 直接吃整份日志不合适,一是日志稍微多一点就占上下文,二是模型看到一堆原始行以后很难稳定抓住关键字段。更稳的做法是让 Python 先读日志,把异常行和相邻上下文整理成证据 JSON。
当前新增的文件是 scripts/read_logs.py。它只做一件事:读取 inputs/sample.log,输出 outputs/evidence.json。这一步不接模型,也不接 Runbook,先确认日志里的异常能被代码抓出来。
一、文件职责
当前项目的输入和输出关系很短:
sample.log 里有 API 请求日志,也有 worker 任务日志。解析器要把每一行拆成统一对象,不然报告阶段只能拿字符串做判断。比如这一行:
text
2026-06-18 09:20:02 WARN service=api request_id=req-1002 path=/orders status=502 cost_ms=2100解析以后至少要得到时间、级别、服务、请求路径、状态码和耗时。看到 status=502、cost_ms=2100,代码就能判断这行需要进入证据。
二、日志对象
scripts/read_logs.py 先放导入和两个数据对象。LogRecord 表示单条日志,Evidence 表示最后写进 JSON 的一组证据。
新增文件:scripts/read_logs.py
python
#!/usr/bin/env python3
"""读取样例日志并提取异常证据。"""
from __future__ import annotations
import json
import re
import shlex
from dataclasses import asdict, dataclass
from config import INPUT_LOG, OUTPUT_DIR
@dataclass
class LogRecord:
ts: str
level: str
service: str
message: str
raw: str
request_id: str | None = None
path: str | None = None
status: int | None = None
cost_ms: int | None = None
task: str | None = None
order_id: str | None = None
@dataclass
class Evidence:
title: str
reasons: list[str]
request_id: str | None
service: str
path: str | None
status: int | None
cost_ms: int | None
lines: list[dict]raw 要保留原始日志行。字段拆错时,报告里还能回看原文,不会只剩下一堆已经加工过的字段。request_id、path、status、cost_ms 主要来自 API 日志;task 和 order_id 主要来自后台任务日志。
Evidence 里多了 reasons,用来说明这组证据为什么被挑出来。比如 status=502、cost_ms=2100、level=ERROR。这比只给模型一组日志行更清楚,后面写报告时也能直接引用。
三、单行解析
日志每行开头都是时间和级别,后面是一串 key=value 字段。LINE_RE 负责切出这三块。字段部分用 shlex.split() 处理,因为 msg="upstream timeout" 这种值里带空格,直接 body.split() 会把它拆坏。
继续修改:scripts/read_logs.py
python
LINE_RE = re.compile(
r"^(?P<ts>\S+\s+\S+)\s+"
r"(?P<level>\S+)\s+"
r"(?P<body>.*)$"
)
def to_int(value: str | None) -> int | None:
if value and value.isdigit():
return int(value)
return None
def parse_fields(body: str) -> dict[str, str]:
try:
chunks = shlex.split(body)
except ValueError:
return {}
fields: dict[str, str] = {}
for chunk in chunks:
if "=" not in chunk:
continue
key, value = chunk.split("=", 1)
fields[key] = value
return fields这几个函数还没有生成 LogRecord,只是把一行日志的后半段变成字典。拿 worker 那行试一下,msg="upstream timeout" 会得到:
json
{
"service": "worker",
"task": "sync_orders",
"order_id": "8802",
"msg": "upstream timeout"
}有了字段字典,parse_line() 再把它装进 LogRecord:
python
def parse_line(line: str) -> LogRecord | None:
raw = line.rstrip("\n")
match = LINE_RE.match(raw)
if not match:
return None
fields = parse_fields(match.group("body"))
return LogRecord(
ts=match.group("ts"),
level=match.group("level"),
service=fields.get("service", ""),
message=fields.get("msg", match.group("body")),
raw=raw,
request_id=fields.get("request_id"),
path=fields.get("path"),
status=to_int(fields.get("status")),
cost_ms=to_int(fields.get("cost_ms")),
task=fields.get("task"),
order_id=fields.get("order_id"),
)解析失败时返回 None。当前版本不保存解析失败的行,真实项目里可以单独写到 outputs/parse-errors.json,不然日志格式变了以后会静默漏数据。
四、文件读取
单行解析能跑以后,再加文件读取函数。它读取 config.py 里配置的 INPUT_LOG,逐行调用 parse_line()。
继续修改:scripts/read_logs.py
python
def read_records() -> list[LogRecord]:
records: list[LogRecord] = []
for line in INPUT_LOG.read_text(encoding="utf-8").splitlines():
record = parse_line(line)
if record:
records.append(record)
return records这一步只把文本变成对象,不判断异常。读完 sample.log 后应该得到 4 条 LogRecord。如果这里数量不是 4,先看日志文件路径和解析正则,不用往 Agent 那边查。
五、异常规则
异常判断先写得很直接:WARN、ERROR、5xx 状态码、耗时超过 1000ms 都算证据。原因单独存到 reasons,这样报告阶段能说清楚这条日志为什么被选中。
继续修改:scripts/read_logs.py
python
def error_reasons(record: LogRecord) -> list[str]:
reasons: list[str] = []
if record.level in {"WARN", "ERROR"}:
reasons.append(f"level={record.level}")
if record.status and record.status >= 500:
reasons.append(f"status={record.status}")
if record.cost_ms and record.cost_ms >= 1000:
reasons.append(f"cost_ms={record.cost_ms}")
if "timeout" in record.message.lower():
reasons.append("message contains timeout")
return reasons
def is_error(record: LogRecord) -> bool:
return bool(error_reasons(record))样例日志里会命中两类证据。API 那行同时命中 level=WARN、status=502、cost_ms=2100;worker 那行命中 level=ERROR 和 message contains timeout。这就把“哪里不对”提前写进 JSON 里了。
六、上下文窗口
单条异常行有时不够。比如 /orders 502 前一行是 /login 正常,后一行是 worker 同步订单超时,这几行放在一起看,才像一次小故障片段。iter_error_windows() 给每条异常带上前后各一行。
继续修改:scripts/read_logs.py
python
def iter_error_windows(
records: list[LogRecord],
size: int = 1,
) -> list[tuple[LogRecord, list[LogRecord]]]:
windows: list[tuple[LogRecord, list[LogRecord]]] = []
for index, record in enumerate(records):
if not is_error(record):
continue
left = max(0, index - size)
right = min(len(records), index + size + 1)
windows.append((record, records[left:right]))
return windows这里返回的是 (异常行, 上下文窗口),异常行单独保存。不能只返回窗口再去里面找第一条异常,因为相邻两行都异常时,第二个窗口里的第一条异常可能还是上一行,证据标题就会串掉。
七、证据生成
build_evidence() 把异常行和上下文窗口整理成 Evidence。标题只放最有用的几段信息:服务、级别、路径、状态码。详细日志保留在 lines 里。
继续修改:scripts/read_logs.py
python
def build_title(record: LogRecord) -> str:
parts = [record.service or "unknown", record.level]
if record.path:
parts.append(record.path)
if record.status:
parts.append(f"status={record.status}")
if record.task:
parts.append(f"task={record.task}")
return " ".join(parts)
def build_evidence(records: list[LogRecord]) -> list[Evidence]:
evidence: list[Evidence] = []
for center, window in iter_error_windows(records):
evidence.append(
Evidence(
title=build_title(center),
reasons=error_reasons(center),
request_id=center.request_id,
service=center.service,
path=center.path,
status=center.status,
cost_ms=center.cost_ms,
lines=[asdict(item) for item in window],
)
)
return evidencelines 里保存的是窗口内每条日志的结构化字段和原文。模型后面如果写出“订单接口 502 后 worker 超时”,报告里能回到这几行看证据,不用只相信模型的文字。
八、命令入口
最后加 main(),把读取、提取、写文件串起来。输出文件固定写到 outputs/evidence.json。
继续修改:scripts/read_logs.py
python
def main() -> int:
records = read_records()
evidence = build_evidence(records)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
output_path = OUTPUT_DIR / "evidence.json"
output_path.write_text(
json.dumps([asdict(item) for item in evidence], ensure_ascii=False, indent=2),
encoding="utf-8",
)
print(f"records: {len(records)}")
print(f"evidence: {len(evidence)}")
print(f"output: {output_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())从 ops-log-agent/ 根目录运行:
bash
uv run python -m scripts.read_logs输出应该是:
text
records: 4
evidence: 2
output: .../ops-log-agent/outputs/evidence.jsonoutputs/evidence.json 里第一组证据会接近这样:
json
{
"title": "api WARN /orders status=502",
"reasons": [
"level=WARN",
"status=502",
"cost_ms=2100"
],
"request_id": "req-1002",
"service": "api",
"path": "/orders",
"status": 502,
"cost_ms": 2100
}第二组证据标题会是 worker ERROR task=sync_orders,reasons 里能看到 level=ERROR 和 message contains timeout。到这一步,模型还没有参与,日志里的异常已经被 Python 提前压成了两组可检查的证据。