📚 学习教程

【高级应用】Day21:AI系统监控与可观测性–让AI系统透明可控

· 2026-04-12 · 6 阅读

【高级应用】Day21:AI系统监控与可观测性–让AI系统透明可控

👤 龙主编 📅 2026-04-12 👁️ 6 阅读 💬 0 评论

章节导语

模型训练好了,部署上线了,然后呢?

很多团队花大量时间训练模型,却忽视了上线后的监控运维。结果模型效果悄悄下降、响应越来越慢、内存越用越多……等问题被发现时,往往已经影响到了用户体验。

AI系统的可观测性和传统软件不同——不仅要看CPU、内存、网络这些基础设施指标,还要看模型预测的分布、特征漂移、输入异常等AI特有的指标。

本文系统讲解AI系统监控与可观测性的完整方案,涵盖基础设施监控、模型性能监控、数据漂移检测、日志与追踪、告警管理等内容。

一、前置说明

1.1 学习路径

阶段 内容
前置 私有化部署(Day15)
本篇 AI系统监控与可观测性

1.2 读者需要的基础

  • Linux基础:会用命令行、懂基本的服务管理
  • Python基础:能处理数据和调用API
  • 机器学习基础:了解模型训练和部署流程

1.3 学习目标

学完本文,你将能够:

  • 理解AI系统监控的特殊性
  • 设计完整的监控指标体系
  • 实现数据漂移检测
  • 构建日志和追踪系统
  • 配置合理的告警规则

二、AI系统监控的特殊性

2.1 传统监控 vs AI监控

传统软件监控关注的是”系统是否正常运转”——CPU、内存、网络、磁盘等。这些指标有明确的阈值,异常就是故障。

AI系统监控不仅要关注”系统是否正常”,更要关注”模型预测是否可信”。模型预测是概率性的,同一个输入可能产生不同的输出,很难用简单的阈值判断对错。

AI监控的特殊挑战

第一,预测质量难以直接测量。传统软件有明确的正确/错误,但AI预测往往是灰区的——看起来合理,但可能不是最优解。

第二,数据分布会随时间变化。真实世界的数据分布不是恒定的,用户行为、市场环境、季节因素都在变。模型在训练时学到的分布,和上线后的分布可能不同。

第三,输入异常难以定义。模型可能遇到从未见过的输入,输出可能是荒谬的但表面上看起来合理。

2.2 监控金字塔

AI系统的监控可以分为四个层次,从下到上:

第一层:基础设施监控

这是最基础的一层,和传统软件监控一样。包括CPU使用率、内存占用、磁盘IO、网络流量、GPU利用率等。基础设施是模型运行的载体,如果这层出问题,上层再好也没用。

第二层:服务性能监控

关注模型服务的性能指标。包括响应延迟(p50、p95、p99)、吞吐量(QPS/TPS)、错误率等。这些指标反映用户体验,需要设置合理的SLO。

第三层:模型行为监控

这是AI特有的监控层次。关注模型预测本身的特性,包括预测分布、置信度分布、输入特征分布、输出异常检测等。

第四层:业务效果监控

最顶层是业务指标。AI系统的最终目的是提升业务效果,所以最终要监控转化率、留存率、用户满意度等业务指标。

基础设施监控
图1:AI系统监控仪表盘

三、基础设施监控

3.1 核心指标

基础设施监控的核心指标:

计算资源:CPU使用率、GPU利用率、GPU显存使用量。这些直接影响模型推理速度。

内存资源:系统内存、GPU显存、缓存命中率。内存不足会导致OOM或频繁GC。

网络资源:带宽使用、连接数、延迟。影响模型服务的吞吐量。

存储资源:磁盘使用率、IOPS、读写延迟。影响模型加载和日志写入。

3.2 监控工具

常用的基础设施监控工具:

Prometheus + Grafana:最流行的开源监控组合。Prometheus负责采集和存储,Grafana负责可视化。

DataDog:SaaS监控平台,功能强大但价格较高。

云厂商监控:AWS CloudWatch、阿里云ARMS等。

3.3 基础设施监控代码

import psutil
import GPUtil
import time
from dataclasses import dataclass
from typing import Dict, Optional
import logging

logger = logging.getLogger(__name__)

@dataclass
class SystemMetrics:
    """系统指标快照"""
    timestamp: float
    cpu_percent: float
    memory_percent: float
    memory_used_gb: float
    memory_total_gb: float
    gpu_utilization: Optional[float] = None
    gpu_memory_percent: Optional[float] = None
    gpu_memory_used_gb: Optional[float] = None
    gpu_temperature: Optional[float] = None

class InfrastructureMonitor:
    """基础设施监控器"""
    
    def __init__(self, check_gpu: bool = True):
        self.check_gpu = check_gpu
        self.metrics_history = []
    
    def collect(self) -> SystemMetrics:
        """采集系统指标"""
        # CPU和内存
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        
        metrics = SystemMetrics(
            timestamp=time.time(),
            cpu_percent=cpu_percent,
            memory_percent=memory.percent,
            memory_used_gb=memory.used / (1024**3),
            memory_total_gb=memory.total / (1024**3)
        )
        
        # GPU指标
        if self.check_gpu:
            try:
                gpus = GPUtil.getGPUs()
                if gpus:
                    gpu = gpus[0]  # 取第一个GPU
                    metrics.gpu_utilization = gpu.load * 100
                    metrics.gpu_memory_percent = gpu.memoryUtil * 100
                    metrics.gpu_memory_used_gb = gpu.memoryUsed / 1024
                    metrics.gpu_temperature = gpu.temperature
            except Exception as e:
                logger.warning(f"GPU监控失败: {e}")
        
        self.metrics_history.append(metrics)
        return metrics
    
    def get_status(self) -> Dict[str, str]:
        """获取系统状态
        
        Returns:
            各资源的状态:normal/warning/critical
        """
        latest = self.collect()
        
        status = {}
        
        # CPU状态
        if latest.cpu_percent > 90:
            status['cpu'] = 'critical'
        elif latest.cpu_percent > 70:
            status['cpu'] = 'warning'
        else:
            status['cpu'] = 'normal'
        
        # 内存状态
        if latest.memory_percent > 90:
            status['memory'] = 'critical'
        elif latest.memory_percent > 70:
            status['memory'] = 'warning'
        else:
            status['memory'] = 'normal'
        
        # GPU状态
        if self.check_gpu and latest.gpu_utilization is not None:
            if latest.gpu_utilization > 95:
                status['gpu'] = 'critical'
            elif latest.gpu_utilization > 80:
                status['gpu'] = 'warning'
            else:
                status['gpu'] = 'normal'
        
        return status
    
    def check_anomalies(self, window_size: int = 100) -> Dict[str, bool]:
        """检测异常
        
        如果当前值超出历史窗口的3个标准差,标记为异常
        """
        if len(self.metrics_history) < window_size:
            return {}
        
        anomalies = {}
        recent = self.metrics_history[-window_size:]
        
        for metric in ['cpu_percent', 'memory_percent', 'gpu_utilization']:
            values = [getattr(m, metric) for m in recent if getattr(m, metric) is not None]
            if not values:
                continue
            
            mean = sum(values) / len(values)
            std = (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5
            
            latest = getattr(self.metrics_history[-1], metric)
            if latest and std > 0 and abs(latest - mean) > 3 * std:
                anomalies[metric] = True
            else:
                anomalies[metric] = False
        
        return anomalies

# 使用
if __name__ == "__main__":
    monitor = InfrastructureMonitor(check_gpu=True)
    
    # 采集一次
    metrics = monitor.collect()
    print(f"CPU: {metrics.cpu_percent:.1f}%")
    print(f"内存: {metrics.memory_percent:.1f}%")
    print(f"GPU利用率: {metrics.gpu_utilization:.1f}%" if metrics.gpu_utilization else "GPU: N/A")
    
    # 检查状态
    status = monitor.get_status()
    print(f"\n状态: {status}")

四、模型性能监控

4.1 核心指标

模型性能监控的核心指标:

延迟指标

  • 平均响应时间(mean latency)
  • P50、P90、P95、P99延迟
  • 首次响应时间(time to first token)
  • 最大延迟(max latency)

吞吐指标

  • 每秒请求数(QPS)
  • 每秒处理的token数
  • 并发用户数

错误指标

  • 错误率(5xx、模型超时)
  • 重试率
  • 降级率(触发fallback的比例)

4.2 预测分布监控

模型预测的分布变化往往是问题的早期信号:

置信度分布:如果大量预测的置信度突然下降,可能说明模型遇到了分布外的数据。

输出长度分布:如果平均输出长度突然变化,可能说明模型行为发生了漂移。

输出词分布:高频词的分布变化,可能反映输入分布的变化。

4.3 模型性能监控代码

import numpy as np
from collections import deque
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import time
import statistics

@dataclass
class PredictionRecord:
    """预测记录"""
    timestamp: float
    latency_ms: float
    confidence: float
    input_length: int
    output_length: int
    error: Optional[str] = None

class ModelPerformanceMonitor:
    """模型性能监控器"""
    
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.predictions = deque(maxlen=window_size)
        self.latencies = deque(maxlen=window_size)
        self.confidences = deque(maxlen=window_size)
        self.errors = deque(maxlen=window_size)
    
    def record(self, latency_ms: float, confidence: float = None,
               input_length: int = 0, output_length: int = 0,
               error: str = None):
        """记录一次预测"""
        record = PredictionRecord(
            timestamp=time.time(),
            latency_ms=latency_ms,
            confidence=confidence or 0.0,
            input_length=input_length,
            output_length=output_length,
            error=error
        )
        self.predictions.append(record)
        self.latencies.append(latency_ms)
        if confidence is not None:
            self.confidences.append(confidence)
        if error:
            self.errors.append(error)
    
    def get_latency_stats(self) -> Dict[str, float]:
        """获取延迟统计"""
        if not self.latencies:
            return {}
        
        sorted_latencies = sorted(self.latencies)
        n = len(sorted_latencies)
        
        return {
            'mean': statistics.mean(self.latencies),
            'median': sorted_latencies[n // 2],
            'p90': sorted_latencies[int(n * 0.9)],
            'p95': sorted_latencies[int(n * 0.95)],
            'p99': sorted_latencies[int(n * 0.99)],
            'max': max(self.latencies),
            'min': min(self.latencies),
        }
    
    def get_error_rate(self) -> float:
        """计算错误率"""
        if not self.predictions:
            return 0.0
        error_count = sum(1 for p in self.predictions if p.error)
        return error_count / len(self.predictions)
    
    def get_confidence_stats(self) -> Dict[str, float]:
        """获取置信度统计"""
        if not self.confidences:
            return {}
        
        confs = list(self.confidences)
        return {
            'mean': statistics.mean(confs),
            'median': statistics.median(confs),
            'min': min(confs),
            'max': max(confs),
            'std': statistics.stdev(confs) if len(confs) > 1 else 0
        }
    
    def detect_drift(self, window1_size: int = 500, window2_size: int = 500) -> Dict[str, bool]:
        """检测漂移
        
        比较最近window2个样本和之前window1个样本的分布差异
        """
        if len(self.predictions) < window1_size + window2_size:
            return {'drifted': False, 'reason': '数据不足'}
        
        # 分割窗口
        older = list(self.predictions)[-window1_size-window2_size:-window2_size]
        newer = list(self.predictions)[-window2_size:]
        
        # 比较置信度分布
        older_conf = [p.confidence for p in older if p.confidence]
        newer_conf = [p.confidence for p in newer if p.confidence]
        
        drift_detected = False
        reason = []
        
        if older_conf and newer_conf:
            older_mean = statistics.mean(older_conf)
            newer_mean = statistics.mean(newer_conf)
            
            # 如果均值变化超过20%,认为有漂移
            if older_mean > 0 and abs(newer_mean - older_mean) / older_mean > 0.2:
                drift_detected = True
                reason.append(f'置信度均值变化: {older_mean:.3f} -> {newer_mean:.3f}')
        
        # 比较延迟分布
        older_lat = [p.latency_ms for p in older]
        newer_lat = [p.latency_ms for p in newer]
        
        if older_lat and newer_lat:
            older_p99 = sorted(older_lat)[int(len(older_lat) * 0.99)]
            newer_p99 = sorted(newer_lat)[int(len(newer_lat) * 0.99)]
            
            if newer_p99 > older_p99 * 1.5:
                drift_detected = True
                reason.append(f'P99延迟上升: {older_p99:.1f}ms -> {newer_p99:.1f}ms')
        
        return {
            'drifted': drift_detected,
            'reason': '; '.join(reason) if reason else '无异常'
        }
    
    def get_summary(self) -> Dict:
        """获取监控摘要"""
        return {
            'total_predictions': len(self.predictions),
            'latency': self.get_latency_stats(),
            'error_rate': self.get_error_rate(),
            'confidence': self.get_confidence_stats(),
            'drift': self.detect_drift()
        }

# 使用
if __name__ == "__main__":
    monitor = ModelPerformanceMonitor()
    
    # 模拟一些预测记录
    import random
    for _ in range(1000):
        monitor.record(
            latency_ms=random.gauss(100, 20),  # 均值100ms,标准差20
            confidence=random.betavariate(2, 5),  # Beta分布,偏小
            input_length=random.randint(50, 200),
            output_length=random.randint(100, 500),
            error=None if random.random() > 0.05 else "timeout"
        )
    
    # 获取统计
    summary = monitor.get_summary()
    print(f"总预测数: {summary['total_predictions']}")
    print(f"错误率: {summary['error_rate']:.2%}")
    print(f"\n延迟统计:")
    for k, v in summary['latency'].items():
        print(f"  {k}: {v:.1f}ms")
    print(f"\n漂移检测: {summary['drift']}")

五、数据漂移检测

5.1 漂移类型

数据漂移是指数据的分布特性发生了显著变化,主要类型:

协变量漂移(Covariate Drift):输入特征的分布发生变化,但输入和输出的关系不变。比如用户的搜索词分布变了,但搜索词和点击率的关系没变。

先验漂移(Prior Drift):标签的先验分布发生变化。比如某个类别的比例突然变了。

概念漂移(Concept Drift):输入和输出的关系发生变化。比如以前”苹果”主要是水果,现在可能主要是手机品牌。

5.2 漂移检测方法

统计检验方法

  • KS检验(Kolmogorov-Smirnov Test):比较两个分布是否来自同一分布
  • 卡方检验:适用于分类特征
  • Population Stability Index(PSI):常用的漂移指标

机器学习方法

  • 用模型预测样本是否属于”正常”分布
  • Isolation Forest等异常检测算法

5.3 漂移检测代码

import numpy as np
from scipy import stats
from typing import Dict, List, Tuple
import logging

logger = logging.getLogger(__name__)

class DataDriftDetector:
    """数据漂移检测器
    
    支持多种漂移检测方法
    """
    
    @staticmethod
    def psi_score(expected: np.ndarray, actual: np.ndarray, buckets: int = 10) -> float:
        """计算Population Stability Index (PSI)
        
        PSI < 0.1: 无显著变化
        PSI 0.1-0.2: 轻微变化
        PSI > 0.2: 显著变化(需要关注)
        """
        # 分桶
        breakpoints = np.linspace(expected.min(), expected.max(), buckets + 1)
        
        expected_percents = np.histogram(expected, breakpoints=breakpoints)[0] / len(expected)
        actual_percents = np.histogram(actual, breakpoints=breakpoints)[0] / len(actual)
        
        # 避免除零
        expected_percents = np.where(expected_percents == 0, 0.0001, expected_percents)
        actual_percents = np.where(actual_percents == 0, 0.0001, actual_percents)
        
        # 计算PSI
        psi = np.sum((actual_percents - expected_percents) * np.log(actual_percents / expected_percents))
        
        return psi
    
    @staticmethod
    def ks_test(reference: np.ndarray, current: np.ndarray, alpha: float = 0.05) -> Dict:
        """Kolmogorov-Smirnov检验
        
        判断两个分布是否显著不同
        """
        statistic, pvalue = stats.ks_2samp(reference, current)
        
        return {
            'statistic': statistic,
            'pvalue': pvalue,
            'drifted': pvalue < alpha,  # pvalue小说明分布不同
            'interpretation': '分布不同' if pvalue < alpha else '分布相似'
        }
    
    @staticmethod
    def population_stability_index(feature_name: str, reference: np.ndarray, 
                                  current: np.ndarray) -> Dict:
        """综合漂移检测"""
        # PSI
        psi = DataDriftDetector.psi_score(reference, current)
        
        # KS检验
        ks_result = DataDriftDetector.ks_test(reference, current)
        
        # 均值和标准差变化
        ref_mean, ref_std = reference.mean(), reference.std()
        cur_mean, cur_std = current.mean(), current.std()
        
        mean_change = abs(cur_mean - ref_mean) / ref_std if ref_std > 0 else 0
        std_change = abs(cur_std - ref_std) / ref_std if ref_std > 0 else 0
        
        # 综合判断
        drift_score = 0
        if psi > 0.2:
            drift_score += 2
        elif psi > 0.1:
            drift_score += 1
        
        if ks_result['drifted']:
            drift_score += 2
        
        if mean_change > 0.3:
            drift_score += 1
        
        if std_change > 0.3:
            drift_score += 1
        
        return {
            'feature': feature_name,
            'psi': psi,
            'psi_interpretation': '显著漂移' if psi > 0.2 else ('轻微漂移' if psi > 0.1 else '无漂移'),
            'ks_test': ks_result,
            'mean_change': mean_change,
            'std_change': std_change,
            'drift_score': drift_score,
            'drift_level': 'high' if drift_score >= 4 else ('medium' if drift_score >= 2 else 'low')
        }

# 使用
if __name__ == "__main__":
    detector = DataDriftDetector()
    
    # 模拟参考数据和当前数据
    np.random.seed(42)
    reference = np.random.normal(100, 15, 10000)
    
    # 模拟轻微漂移
    current = np.random.normal(105, 18, 1000)
    
    # 检测漂移
    result = detector.population_stability_index("feature_1", reference, current)
    
    print(f"特征: {result['feature']}")
    print(f"PSI: {result['psi']:.4f} ({result['psi_interpretation']})")
    print(f"KS检验: {result['ks_test']['interpretation']}")
    print(f"均值变化: {result['mean_change']:.2f}个标准差")
    print(f"标准差变化: {result['std_change']:.2f}个标准差")
    print(f"漂移级别: {result['drift_level']}")
日志追踪
图2:分布式追踪架构

六、日志与追踪

6.1 日志策略

AI系统的日志需要特殊设计:

请求日志:记录每次请求的输入、输出、延迟、用户ID等。要注意脱敏,不要记录敏感信息。

模型日志:记录模型的预测结果、置信度、版本信息等。

系统日志:记录服务启动、停止、配置变更等系统事件。

错误日志:记录异常、错误堆栈、错误上下文。

6.2 分布式追踪

当系统包含多个服务时,需要分布式追踪来追踪一个请求的完整调用链。

常用工具:

  • Jaeger:开源分布式追踪系统
  • Zipkin:Twitter开源的追踪系统
  • AWS X-Ray:AWS的追踪服务

七、告警管理

7.1 告警分级

级别 说明 响应时间 示例
P0 服务不可用 5分钟内 服务崩溃、全部请求失败
P1 严重降级 15分钟内 错误率>5%、延迟>P99阈值
P2 轻度异常 1小时内 CPU>80%、置信度下降
P3 预警 24小时内 数据漂移检测

7.2 告警规则示例

# Prometheus告警规则示例
groups:
- name: ai-system-alerts
  rules:
  
  # 服务不可用
  - alert: AIServiceDown
    expr: up{job="ai-model"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "AI服务不可用"
      description: "AI模型服务已经停止运行超过1分钟"
  
  # 高错误率
  - alert: HighErrorRate
    expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "错误率超过5%"
      description: "服务错误率持续超过5%,当前值: {{ $value }}"
  
  # 高延迟
  - alert: HighLatency
    expr: histogram_quantile(0.99, rate(request_latency_seconds_bucket[5m])) > 5
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "P99延迟过高"
      description: "P99延迟超过5秒,当前值: {{ $value }}s"
  
  # GPU资源紧张
  - alert: GPUMemoryHigh
    expr: (gpu_memory_used_bytes / gpu_memory_total_bytes) > 0.9
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "GPU显存使用率超过90%"
      description: "GPU显存即将耗尽,当前使用率: {{ $value | humanizePercentage }}"
  
  # 模型置信度下降
  - alert: ModelConfidenceDrop
    expr: avg(model_confidence_sum / model_requests_total) < 0.5
    for: 30m
    labels:
      severity: warning
    annotations:
      summary: "模型置信度持续下降"
      description: "平均置信度低于0.5,可能存在数据漂移"
  
  # 数据漂移
  - alert: DataDriftDetected
    expr: drift_score > 4
    for: 15m
    labels:
      severity: info
    annotations:
      summary: "检测到数据漂移"
      description: "特征分布发生显著变化,请检查模型是否需要重训"
告警管理
图3:告警管理系统

八、监控架构设计

8.1 推荐架构

一个完整的AI监控架构通常包含:

数据采集层:Prometheus/Vector/Fluentd负责采集指标和日志。

存储层:时序数据库(Prometheus/InfluxDB)存储指标,日志数据库(Elasticsearch/Loki)存储日志。

可视化层:Grafana/Kibana负责数据可视化。

告警层:AlertManager/PagerDuty负责告警管理和通知。

分析层:Jupyter/分析师工作台用于深度分析。

8.2 SLO设置

SLO(Service Level Objective)是服务级别目标,定义服务应该达到的可靠性水平:

  • 可用性:99.9%(每月约8.5分钟宕机时间)
  • P99延迟:<500ms
  • 错误率:<0.1%

九、总结

监控是AI系统的生命线。模型上线不是终点,持续监控才能发现问题、验证效果、指导迭代。

分层监控很重要。从基础设施到业务效果,每层都要关注。

漂移检测是AI特有的监控需求。数据分布变化会悄悄影响模型效果,要提前检测。

告警要精准。太多告警会导致告警疲劳,太少告警会错过问题。

延伸阅读

  • Prometheus官方文档
  • Grafana官方文档
  • AI Engineering最佳实践

课后练习

基础题:搭建一个Prometheus + Grafana监控环境,监控一个简单的模型服务。

进阶题:实现一个数据漂移检测系统,对多个特征同时监控。

挑战题:设计一个完整的AI系统可观测性方案,包括指标、日志、追踪、告警。

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

微信公众号二维码

扫码关注公众号

QQ
QQ二维码

扫码添加QQ