【高级应用】Day21:AI系统监控与可观测性–让AI系统透明可控
章节导语
模型训练好了,部署上线了,然后呢?
很多团队花大量时间训练模型,却忽视了上线后的监控运维。结果模型效果悄悄下降、响应越来越慢、内存越用越多……等问题被发现时,往往已经影响到了用户体验。
AI系统的可观测性和传统软件不同——不仅要看CPU、内存、网络这些基础设施指标,还要看模型预测的分布、特征漂移、输入异常等AI特有的指标。
本文系统讲解AI系统监控与可观测性的完整方案,涵盖基础设施监控、模型性能监控、数据漂移检测、日志与追踪、告警管理等内容。
一、前置说明
1.1 学习路径
| 阶段 | 内容 |
|---|---|
| 前置 | 私有化部署(Day15) |
| 本篇 | AI系统监控与可观测性 |
1.2 读者需要的基础
- Linux基础:会用命令行、懂基本的服务管理
- Python基础:能处理数据和调用API
- 机器学习基础:了解模型训练和部署流程
1.3 学习目标
学完本文,你将能够:
- 理解AI系统监控的特殊性
- 设计完整的监控指标体系
- 实现数据漂移检测
- 构建日志和追踪系统
- 配置合理的告警规则
二、AI系统监控的特殊性
2.1 传统监控 vs AI监控
传统软件监控关注的是”系统是否正常运转”——CPU、内存、网络、磁盘等。这些指标有明确的阈值,异常就是故障。
AI系统监控不仅要关注”系统是否正常”,更要关注”模型预测是否可信”。模型预测是概率性的,同一个输入可能产生不同的输出,很难用简单的阈值判断对错。
AI监控的特殊挑战:
第一,预测质量难以直接测量。传统软件有明确的正确/错误,但AI预测往往是灰区的——看起来合理,但可能不是最优解。
第二,数据分布会随时间变化。真实世界的数据分布不是恒定的,用户行为、市场环境、季节因素都在变。模型在训练时学到的分布,和上线后的分布可能不同。
第三,输入异常难以定义。模型可能遇到从未见过的输入,输出可能是荒谬的但表面上看起来合理。
2.2 监控金字塔
AI系统的监控可以分为四个层次,从下到上:
第一层:基础设施监控
这是最基础的一层,和传统软件监控一样。包括CPU使用率、内存占用、磁盘IO、网络流量、GPU利用率等。基础设施是模型运行的载体,如果这层出问题,上层再好也没用。
第二层:服务性能监控
关注模型服务的性能指标。包括响应延迟(p50、p95、p99)、吞吐量(QPS/TPS)、错误率等。这些指标反映用户体验,需要设置合理的SLO。
第三层:模型行为监控
这是AI特有的监控层次。关注模型预测本身的特性,包括预测分布、置信度分布、输入特征分布、输出异常检测等。
第四层:业务效果监控
最顶层是业务指标。AI系统的最终目的是提升业务效果,所以最终要监控转化率、留存率、用户满意度等业务指标。

三、基础设施监控
3.1 核心指标
基础设施监控的核心指标:
计算资源:CPU使用率、GPU利用率、GPU显存使用量。这些直接影响模型推理速度。
内存资源:系统内存、GPU显存、缓存命中率。内存不足会导致OOM或频繁GC。
网络资源:带宽使用、连接数、延迟。影响模型服务的吞吐量。
存储资源:磁盘使用率、IOPS、读写延迟。影响模型加载和日志写入。
3.2 监控工具
常用的基础设施监控工具:
Prometheus + Grafana:最流行的开源监控组合。Prometheus负责采集和存储,Grafana负责可视化。
DataDog:SaaS监控平台,功能强大但价格较高。
云厂商监控:AWS CloudWatch、阿里云ARMS等。
3.3 基础设施监控代码
import psutil
import GPUtil
import time
from dataclasses import dataclass
from typing import Dict, Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class SystemMetrics:
"""系统指标快照"""
timestamp: float
cpu_percent: float
memory_percent: float
memory_used_gb: float
memory_total_gb: float
gpu_utilization: Optional[float] = None
gpu_memory_percent: Optional[float] = None
gpu_memory_used_gb: Optional[float] = None
gpu_temperature: Optional[float] = None
class InfrastructureMonitor:
"""基础设施监控器"""
def __init__(self, check_gpu: bool = True):
self.check_gpu = check_gpu
self.metrics_history = []
def collect(self) -> SystemMetrics:
"""采集系统指标"""
# CPU和内存
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
metrics = SystemMetrics(
timestamp=time.time(),
cpu_percent=cpu_percent,
memory_percent=memory.percent,
memory_used_gb=memory.used / (1024**3),
memory_total_gb=memory.total / (1024**3)
)
# GPU指标
if self.check_gpu:
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0] # 取第一个GPU
metrics.gpu_utilization = gpu.load * 100
metrics.gpu_memory_percent = gpu.memoryUtil * 100
metrics.gpu_memory_used_gb = gpu.memoryUsed / 1024
metrics.gpu_temperature = gpu.temperature
except Exception as e:
logger.warning(f"GPU监控失败: {e}")
self.metrics_history.append(metrics)
return metrics
def get_status(self) -> Dict[str, str]:
"""获取系统状态
Returns:
各资源的状态:normal/warning/critical
"""
latest = self.collect()
status = {}
# CPU状态
if latest.cpu_percent > 90:
status['cpu'] = 'critical'
elif latest.cpu_percent > 70:
status['cpu'] = 'warning'
else:
status['cpu'] = 'normal'
# 内存状态
if latest.memory_percent > 90:
status['memory'] = 'critical'
elif latest.memory_percent > 70:
status['memory'] = 'warning'
else:
status['memory'] = 'normal'
# GPU状态
if self.check_gpu and latest.gpu_utilization is not None:
if latest.gpu_utilization > 95:
status['gpu'] = 'critical'
elif latest.gpu_utilization > 80:
status['gpu'] = 'warning'
else:
status['gpu'] = 'normal'
return status
def check_anomalies(self, window_size: int = 100) -> Dict[str, bool]:
"""检测异常
如果当前值超出历史窗口的3个标准差,标记为异常
"""
if len(self.metrics_history) < window_size:
return {}
anomalies = {}
recent = self.metrics_history[-window_size:]
for metric in ['cpu_percent', 'memory_percent', 'gpu_utilization']:
values = [getattr(m, metric) for m in recent if getattr(m, metric) is not None]
if not values:
continue
mean = sum(values) / len(values)
std = (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5
latest = getattr(self.metrics_history[-1], metric)
if latest and std > 0 and abs(latest - mean) > 3 * std:
anomalies[metric] = True
else:
anomalies[metric] = False
return anomalies
# 使用
if __name__ == "__main__":
monitor = InfrastructureMonitor(check_gpu=True)
# 采集一次
metrics = monitor.collect()
print(f"CPU: {metrics.cpu_percent:.1f}%")
print(f"内存: {metrics.memory_percent:.1f}%")
print(f"GPU利用率: {metrics.gpu_utilization:.1f}%" if metrics.gpu_utilization else "GPU: N/A")
# 检查状态
status = monitor.get_status()
print(f"\n状态: {status}")
四、模型性能监控
4.1 核心指标
模型性能监控的核心指标:
延迟指标:
- 平均响应时间(mean latency)
- P50、P90、P95、P99延迟
- 首次响应时间(time to first token)
- 最大延迟(max latency)
吞吐指标:
- 每秒请求数(QPS)
- 每秒处理的token数
- 并发用户数
错误指标:
- 错误率(5xx、模型超时)
- 重试率
- 降级率(触发fallback的比例)
4.2 预测分布监控
模型预测的分布变化往往是问题的早期信号:
置信度分布:如果大量预测的置信度突然下降,可能说明模型遇到了分布外的数据。
输出长度分布:如果平均输出长度突然变化,可能说明模型行为发生了漂移。
输出词分布:高频词的分布变化,可能反映输入分布的变化。
4.3 模型性能监控代码
import numpy as np
from collections import deque
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import time
import statistics
@dataclass
class PredictionRecord:
"""预测记录"""
timestamp: float
latency_ms: float
confidence: float
input_length: int
output_length: int
error: Optional[str] = None
class ModelPerformanceMonitor:
"""模型性能监控器"""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.predictions = deque(maxlen=window_size)
self.latencies = deque(maxlen=window_size)
self.confidences = deque(maxlen=window_size)
self.errors = deque(maxlen=window_size)
def record(self, latency_ms: float, confidence: float = None,
input_length: int = 0, output_length: int = 0,
error: str = None):
"""记录一次预测"""
record = PredictionRecord(
timestamp=time.time(),
latency_ms=latency_ms,
confidence=confidence or 0.0,
input_length=input_length,
output_length=output_length,
error=error
)
self.predictions.append(record)
self.latencies.append(latency_ms)
if confidence is not None:
self.confidences.append(confidence)
if error:
self.errors.append(error)
def get_latency_stats(self) -> Dict[str, float]:
"""获取延迟统计"""
if not self.latencies:
return {}
sorted_latencies = sorted(self.latencies)
n = len(sorted_latencies)
return {
'mean': statistics.mean(self.latencies),
'median': sorted_latencies[n // 2],
'p90': sorted_latencies[int(n * 0.9)],
'p95': sorted_latencies[int(n * 0.95)],
'p99': sorted_latencies[int(n * 0.99)],
'max': max(self.latencies),
'min': min(self.latencies),
}
def get_error_rate(self) -> float:
"""计算错误率"""
if not self.predictions:
return 0.0
error_count = sum(1 for p in self.predictions if p.error)
return error_count / len(self.predictions)
def get_confidence_stats(self) -> Dict[str, float]:
"""获取置信度统计"""
if not self.confidences:
return {}
confs = list(self.confidences)
return {
'mean': statistics.mean(confs),
'median': statistics.median(confs),
'min': min(confs),
'max': max(confs),
'std': statistics.stdev(confs) if len(confs) > 1 else 0
}
def detect_drift(self, window1_size: int = 500, window2_size: int = 500) -> Dict[str, bool]:
"""检测漂移
比较最近window2个样本和之前window1个样本的分布差异
"""
if len(self.predictions) < window1_size + window2_size:
return {'drifted': False, 'reason': '数据不足'}
# 分割窗口
older = list(self.predictions)[-window1_size-window2_size:-window2_size]
newer = list(self.predictions)[-window2_size:]
# 比较置信度分布
older_conf = [p.confidence for p in older if p.confidence]
newer_conf = [p.confidence for p in newer if p.confidence]
drift_detected = False
reason = []
if older_conf and newer_conf:
older_mean = statistics.mean(older_conf)
newer_mean = statistics.mean(newer_conf)
# 如果均值变化超过20%,认为有漂移
if older_mean > 0 and abs(newer_mean - older_mean) / older_mean > 0.2:
drift_detected = True
reason.append(f'置信度均值变化: {older_mean:.3f} -> {newer_mean:.3f}')
# 比较延迟分布
older_lat = [p.latency_ms for p in older]
newer_lat = [p.latency_ms for p in newer]
if older_lat and newer_lat:
older_p99 = sorted(older_lat)[int(len(older_lat) * 0.99)]
newer_p99 = sorted(newer_lat)[int(len(newer_lat) * 0.99)]
if newer_p99 > older_p99 * 1.5:
drift_detected = True
reason.append(f'P99延迟上升: {older_p99:.1f}ms -> {newer_p99:.1f}ms')
return {
'drifted': drift_detected,
'reason': '; '.join(reason) if reason else '无异常'
}
def get_summary(self) -> Dict:
"""获取监控摘要"""
return {
'total_predictions': len(self.predictions),
'latency': self.get_latency_stats(),
'error_rate': self.get_error_rate(),
'confidence': self.get_confidence_stats(),
'drift': self.detect_drift()
}
# 使用
if __name__ == "__main__":
monitor = ModelPerformanceMonitor()
# 模拟一些预测记录
import random
for _ in range(1000):
monitor.record(
latency_ms=random.gauss(100, 20), # 均值100ms,标准差20
confidence=random.betavariate(2, 5), # Beta分布,偏小
input_length=random.randint(50, 200),
output_length=random.randint(100, 500),
error=None if random.random() > 0.05 else "timeout"
)
# 获取统计
summary = monitor.get_summary()
print(f"总预测数: {summary['total_predictions']}")
print(f"错误率: {summary['error_rate']:.2%}")
print(f"\n延迟统计:")
for k, v in summary['latency'].items():
print(f" {k}: {v:.1f}ms")
print(f"\n漂移检测: {summary['drift']}")
五、数据漂移检测
5.1 漂移类型
数据漂移是指数据的分布特性发生了显著变化,主要类型:
协变量漂移(Covariate Drift):输入特征的分布发生变化,但输入和输出的关系不变。比如用户的搜索词分布变了,但搜索词和点击率的关系没变。
先验漂移(Prior Drift):标签的先验分布发生变化。比如某个类别的比例突然变了。
概念漂移(Concept Drift):输入和输出的关系发生变化。比如以前”苹果”主要是水果,现在可能主要是手机品牌。
5.2 漂移检测方法
统计检验方法:
- KS检验(Kolmogorov-Smirnov Test):比较两个分布是否来自同一分布
- 卡方检验:适用于分类特征
- Population Stability Index(PSI):常用的漂移指标
机器学习方法:
- 用模型预测样本是否属于”正常”分布
- Isolation Forest等异常检测算法
5.3 漂移检测代码
import numpy as np
from scipy import stats
from typing import Dict, List, Tuple
import logging
logger = logging.getLogger(__name__)
class DataDriftDetector:
"""数据漂移检测器
支持多种漂移检测方法
"""
@staticmethod
def psi_score(expected: np.ndarray, actual: np.ndarray, buckets: int = 10) -> float:
"""计算Population Stability Index (PSI)
PSI < 0.1: 无显著变化
PSI 0.1-0.2: 轻微变化
PSI > 0.2: 显著变化(需要关注)
"""
# 分桶
breakpoints = np.linspace(expected.min(), expected.max(), buckets + 1)
expected_percents = np.histogram(expected, breakpoints=breakpoints)[0] / len(expected)
actual_percents = np.histogram(actual, breakpoints=breakpoints)[0] / len(actual)
# 避免除零
expected_percents = np.where(expected_percents == 0, 0.0001, expected_percents)
actual_percents = np.where(actual_percents == 0, 0.0001, actual_percents)
# 计算PSI
psi = np.sum((actual_percents - expected_percents) * np.log(actual_percents / expected_percents))
return psi
@staticmethod
def ks_test(reference: np.ndarray, current: np.ndarray, alpha: float = 0.05) -> Dict:
"""Kolmogorov-Smirnov检验
判断两个分布是否显著不同
"""
statistic, pvalue = stats.ks_2samp(reference, current)
return {
'statistic': statistic,
'pvalue': pvalue,
'drifted': pvalue < alpha, # pvalue小说明分布不同
'interpretation': '分布不同' if pvalue < alpha else '分布相似'
}
@staticmethod
def population_stability_index(feature_name: str, reference: np.ndarray,
current: np.ndarray) -> Dict:
"""综合漂移检测"""
# PSI
psi = DataDriftDetector.psi_score(reference, current)
# KS检验
ks_result = DataDriftDetector.ks_test(reference, current)
# 均值和标准差变化
ref_mean, ref_std = reference.mean(), reference.std()
cur_mean, cur_std = current.mean(), current.std()
mean_change = abs(cur_mean - ref_mean) / ref_std if ref_std > 0 else 0
std_change = abs(cur_std - ref_std) / ref_std if ref_std > 0 else 0
# 综合判断
drift_score = 0
if psi > 0.2:
drift_score += 2
elif psi > 0.1:
drift_score += 1
if ks_result['drifted']:
drift_score += 2
if mean_change > 0.3:
drift_score += 1
if std_change > 0.3:
drift_score += 1
return {
'feature': feature_name,
'psi': psi,
'psi_interpretation': '显著漂移' if psi > 0.2 else ('轻微漂移' if psi > 0.1 else '无漂移'),
'ks_test': ks_result,
'mean_change': mean_change,
'std_change': std_change,
'drift_score': drift_score,
'drift_level': 'high' if drift_score >= 4 else ('medium' if drift_score >= 2 else 'low')
}
# 使用
if __name__ == "__main__":
detector = DataDriftDetector()
# 模拟参考数据和当前数据
np.random.seed(42)
reference = np.random.normal(100, 15, 10000)
# 模拟轻微漂移
current = np.random.normal(105, 18, 1000)
# 检测漂移
result = detector.population_stability_index("feature_1", reference, current)
print(f"特征: {result['feature']}")
print(f"PSI: {result['psi']:.4f} ({result['psi_interpretation']})")
print(f"KS检验: {result['ks_test']['interpretation']}")
print(f"均值变化: {result['mean_change']:.2f}个标准差")
print(f"标准差变化: {result['std_change']:.2f}个标准差")
print(f"漂移级别: {result['drift_level']}")

六、日志与追踪
6.1 日志策略
AI系统的日志需要特殊设计:
请求日志:记录每次请求的输入、输出、延迟、用户ID等。要注意脱敏,不要记录敏感信息。
模型日志:记录模型的预测结果、置信度、版本信息等。
系统日志:记录服务启动、停止、配置变更等系统事件。
错误日志:记录异常、错误堆栈、错误上下文。
6.2 分布式追踪
当系统包含多个服务时,需要分布式追踪来追踪一个请求的完整调用链。
常用工具:
- Jaeger:开源分布式追踪系统
- Zipkin:Twitter开源的追踪系统
- AWS X-Ray:AWS的追踪服务
七、告警管理
7.1 告警分级
| 级别 | 说明 | 响应时间 | 示例 |
|---|---|---|---|
| P0 | 服务不可用 | 5分钟内 | 服务崩溃、全部请求失败 |
| P1 | 严重降级 | 15分钟内 | 错误率>5%、延迟>P99阈值 |
| P2 | 轻度异常 | 1小时内 | CPU>80%、置信度下降 |
| P3 | 预警 | 24小时内 | 数据漂移检测 |
7.2 告警规则示例
# Prometheus告警规则示例
groups:
- name: ai-system-alerts
rules:
# 服务不可用
- alert: AIServiceDown
expr: up{job="ai-model"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "AI服务不可用"
description: "AI模型服务已经停止运行超过1分钟"
# 高错误率
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "错误率超过5%"
description: "服务错误率持续超过5%,当前值: {{ $value }}"
# 高延迟
- alert: HighLatency
expr: histogram_quantile(0.99, rate(request_latency_seconds_bucket[5m])) > 5
for: 10m
labels:
severity: warning
annotations:
summary: "P99延迟过高"
description: "P99延迟超过5秒,当前值: {{ $value }}s"
# GPU资源紧张
- alert: GPUMemoryHigh
expr: (gpu_memory_used_bytes / gpu_memory_total_bytes) > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "GPU显存使用率超过90%"
description: "GPU显存即将耗尽,当前使用率: {{ $value | humanizePercentage }}"
# 模型置信度下降
- alert: ModelConfidenceDrop
expr: avg(model_confidence_sum / model_requests_total) < 0.5
for: 30m
labels:
severity: warning
annotations:
summary: "模型置信度持续下降"
description: "平均置信度低于0.5,可能存在数据漂移"
# 数据漂移
- alert: DataDriftDetected
expr: drift_score > 4
for: 15m
labels:
severity: info
annotations:
summary: "检测到数据漂移"
description: "特征分布发生显著变化,请检查模型是否需要重训"

八、监控架构设计
8.1 推荐架构
一个完整的AI监控架构通常包含:
数据采集层:Prometheus/Vector/Fluentd负责采集指标和日志。
存储层:时序数据库(Prometheus/InfluxDB)存储指标,日志数据库(Elasticsearch/Loki)存储日志。
可视化层:Grafana/Kibana负责数据可视化。
告警层:AlertManager/PagerDuty负责告警管理和通知。
分析层:Jupyter/分析师工作台用于深度分析。
8.2 SLO设置
SLO(Service Level Objective)是服务级别目标,定义服务应该达到的可靠性水平:
- 可用性:99.9%(每月约8.5分钟宕机时间)
- P99延迟:<500ms
- 错误率:<0.1%
九、总结
监控是AI系统的生命线。模型上线不是终点,持续监控才能发现问题、验证效果、指导迭代。
分层监控很重要。从基础设施到业务效果,每层都要关注。
漂移检测是AI特有的监控需求。数据分布变化会悄悄影响模型效果,要提前检测。
告警要精准。太多告警会导致告警疲劳,太少告警会错过问题。
延伸阅读
- Prometheus官方文档
- Grafana官方文档
- AI Engineering最佳实践
课后练习
基础题:搭建一个Prometheus + Grafana监控环境,监控一个简单的模型服务。
进阶题:实现一个数据漂移检测系统,对多个特征同时监控。
挑战题:设计一个完整的AI系统可观测性方案,包括指标、日志、追踪、告警。