Skip to content

日志解析与证据提取

inputs/sample.log 现在还是普通文本。Agent 直接吃整份日志不合适,一是日志稍微多一点就占上下文,二是模型看到一堆原始行以后很难稳定抓住关键字段。更稳的做法是让 Python 先读日志,把异常行和相邻上下文整理成证据 JSON。

当前新增的文件是 scripts/read_logs.py。它只做一件事:读取 inputs/sample.log,输出 outputs/evidence.json。这一步不接模型,也不接 Runbook,先确认日志里的异常能被代码抓出来。

一、文件职责

当前项目的输入和输出关系很短:

sample.log 里有 API 请求日志,也有 worker 任务日志。解析器要把每一行拆成统一对象,不然报告阶段只能拿字符串做判断。比如这一行:

text
2026-06-18 09:20:02 WARN service=api request_id=req-1002 path=/orders status=502 cost_ms=2100

解析以后至少要得到时间、级别、服务、请求路径、状态码和耗时。看到 status=502cost_ms=2100,代码就能判断这行需要进入证据。

二、日志对象

scripts/read_logs.py 先放导入和两个数据对象。LogRecord 表示单条日志,Evidence 表示最后写进 JSON 的一组证据。

新增文件:scripts/read_logs.py

python
#!/usr/bin/env python3
"""读取样例日志并提取异常证据。"""

from __future__ import annotations

import json
import re
import shlex
from dataclasses import asdict, dataclass

from config import INPUT_LOG, OUTPUT_DIR


@dataclass
class LogRecord:
    ts: str
    level: str
    service: str
    message: str
    raw: str
    request_id: str | None = None
    path: str | None = None
    status: int | None = None
    cost_ms: int | None = None
    task: str | None = None
    order_id: str | None = None


@dataclass
class Evidence:
    title: str
    reasons: list[str]
    request_id: str | None
    service: str
    path: str | None
    status: int | None
    cost_ms: int | None
    lines: list[dict]

raw 要保留原始日志行。字段拆错时,报告里还能回看原文,不会只剩下一堆已经加工过的字段。request_idpathstatuscost_ms 主要来自 API 日志;taskorder_id 主要来自后台任务日志。

Evidence 里多了 reasons,用来说明这组证据为什么被挑出来。比如 status=502cost_ms=2100level=ERROR。这比只给模型一组日志行更清楚,后面写报告时也能直接引用。

三、单行解析

日志每行开头都是时间和级别,后面是一串 key=value 字段。LINE_RE 负责切出这三块。字段部分用 shlex.split() 处理,因为 msg="upstream timeout" 这种值里带空格,直接 body.split() 会把它拆坏。

继续修改:scripts/read_logs.py

python
LINE_RE = re.compile(
    r"^(?P<ts>\S+\s+\S+)\s+"
    r"(?P<level>\S+)\s+"
    r"(?P<body>.*)$"
)


def to_int(value: str | None) -> int | None:
    if value and value.isdigit():
        return int(value)
    return None


def parse_fields(body: str) -> dict[str, str]:
    try:
        chunks = shlex.split(body)
    except ValueError:
        return {}

    fields: dict[str, str] = {}
    for chunk in chunks:
        if "=" not in chunk:
            continue
        key, value = chunk.split("=", 1)
        fields[key] = value
    return fields

这几个函数还没有生成 LogRecord,只是把一行日志的后半段变成字典。拿 worker 那行试一下,msg="upstream timeout" 会得到:

json
{
  "service": "worker",
  "task": "sync_orders",
  "order_id": "8802",
  "msg": "upstream timeout"
}

有了字段字典,parse_line() 再把它装进 LogRecord

python
def parse_line(line: str) -> LogRecord | None:
    raw = line.rstrip("\n")
    match = LINE_RE.match(raw)
    if not match:
        return None

    fields = parse_fields(match.group("body"))
    return LogRecord(
        ts=match.group("ts"),
        level=match.group("level"),
        service=fields.get("service", ""),
        message=fields.get("msg", match.group("body")),
        raw=raw,
        request_id=fields.get("request_id"),
        path=fields.get("path"),
        status=to_int(fields.get("status")),
        cost_ms=to_int(fields.get("cost_ms")),
        task=fields.get("task"),
        order_id=fields.get("order_id"),
    )

解析失败时返回 None。当前版本不保存解析失败的行,真实项目里可以单独写到 outputs/parse-errors.json,不然日志格式变了以后会静默漏数据。

四、文件读取

单行解析能跑以后,再加文件读取函数。它读取 config.py 里配置的 INPUT_LOG,逐行调用 parse_line()

继续修改:scripts/read_logs.py

python
def read_records() -> list[LogRecord]:
    records: list[LogRecord] = []

    for line in INPUT_LOG.read_text(encoding="utf-8").splitlines():
        record = parse_line(line)
        if record:
            records.append(record)

    return records

这一步只把文本变成对象,不判断异常。读完 sample.log 后应该得到 4 条 LogRecord。如果这里数量不是 4,先看日志文件路径和解析正则,不用往 Agent 那边查。

五、异常规则

异常判断先写得很直接:WARNERROR、5xx 状态码、耗时超过 1000ms 都算证据。原因单独存到 reasons,这样报告阶段能说清楚这条日志为什么被选中。

继续修改:scripts/read_logs.py

python
def error_reasons(record: LogRecord) -> list[str]:
    reasons: list[str] = []

    if record.level in {"WARN", "ERROR"}:
        reasons.append(f"level={record.level}")
    if record.status and record.status >= 500:
        reasons.append(f"status={record.status}")
    if record.cost_ms and record.cost_ms >= 1000:
        reasons.append(f"cost_ms={record.cost_ms}")
    if "timeout" in record.message.lower():
        reasons.append("message contains timeout")

    return reasons


def is_error(record: LogRecord) -> bool:
    return bool(error_reasons(record))

样例日志里会命中两类证据。API 那行同时命中 level=WARNstatus=502cost_ms=2100;worker 那行命中 level=ERRORmessage contains timeout。这就把“哪里不对”提前写进 JSON 里了。

六、上下文窗口

单条异常行有时不够。比如 /orders 502 前一行是 /login 正常,后一行是 worker 同步订单超时,这几行放在一起看,才像一次小故障片段。iter_error_windows() 给每条异常带上前后各一行。

继续修改:scripts/read_logs.py

python
def iter_error_windows(
    records: list[LogRecord],
    size: int = 1,
) -> list[tuple[LogRecord, list[LogRecord]]]:
    windows: list[tuple[LogRecord, list[LogRecord]]] = []

    for index, record in enumerate(records):
        if not is_error(record):
            continue

        left = max(0, index - size)
        right = min(len(records), index + size + 1)
        windows.append((record, records[left:right]))

    return windows

这里返回的是 (异常行, 上下文窗口),异常行单独保存。不能只返回窗口再去里面找第一条异常,因为相邻两行都异常时,第二个窗口里的第一条异常可能还是上一行,证据标题就会串掉。

七、证据生成

build_evidence() 把异常行和上下文窗口整理成 Evidence。标题只放最有用的几段信息:服务、级别、路径、状态码。详细日志保留在 lines 里。

继续修改:scripts/read_logs.py

python
def build_title(record: LogRecord) -> str:
    parts = [record.service or "unknown", record.level]
    if record.path:
        parts.append(record.path)
    if record.status:
        parts.append(f"status={record.status}")
    if record.task:
        parts.append(f"task={record.task}")
    return " ".join(parts)


def build_evidence(records: list[LogRecord]) -> list[Evidence]:
    evidence: list[Evidence] = []

    for center, window in iter_error_windows(records):
        evidence.append(
            Evidence(
                title=build_title(center),
                reasons=error_reasons(center),
                request_id=center.request_id,
                service=center.service,
                path=center.path,
                status=center.status,
                cost_ms=center.cost_ms,
                lines=[asdict(item) for item in window],
            )
        )

    return evidence

lines 里保存的是窗口内每条日志的结构化字段和原文。模型后面如果写出“订单接口 502 后 worker 超时”,报告里能回到这几行看证据,不用只相信模型的文字。

八、命令入口

最后加 main(),把读取、提取、写文件串起来。输出文件固定写到 outputs/evidence.json

继续修改:scripts/read_logs.py

python
def main() -> int:
    records = read_records()
    evidence = build_evidence(records)

    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    output_path = OUTPUT_DIR / "evidence.json"
    output_path.write_text(
        json.dumps([asdict(item) for item in evidence], ensure_ascii=False, indent=2),
        encoding="utf-8",
    )

    print(f"records: {len(records)}")
    print(f"evidence: {len(evidence)}")
    print(f"output: {output_path}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

ops-log-agent/ 根目录运行:

bash
uv run python -m scripts.read_logs

输出应该是:

text
records: 4
evidence: 2
output: .../ops-log-agent/outputs/evidence.json

outputs/evidence.json 里第一组证据会接近这样:

json
{
  "title": "api WARN /orders status=502",
  "reasons": [
    "level=WARN",
    "status=502",
    "cost_ms=2100"
  ],
  "request_id": "req-1002",
  "service": "api",
  "path": "/orders",
  "status": 502,
  "cost_ms": 2100
}

第二组证据标题会是 worker ERROR task=sync_ordersreasons 里能看到 level=ERRORmessage contains timeout。到这一步,模型还没有参与,日志里的异常已经被 Python 提前压成了两组可检查的证据。