📚 学习教程

【高级应用】Day10:知识库构建与数据处理–打造高质量知识库的完整流程

· 2026-04-12 · 8 阅读

【高级应用】Day10:知识库构建与数据处理–打造高质量知识库的完整流程

👤 龙主编 📅 2026-04-12 👁️ 8 阅读 💬 0 评论

章节导语

RAG效果好不好,60%取决于知识库质量。Garbage in, Garbage out——低质量的文档,再好的检索算法也没用。

知识库构建不只是把文档扔进去,而是:文档清洗、分块策略、元数据提取、质量控制的一套完整流程。

本文系统讲解企业知识库的完整构建流程,每步都有实战代码。

一、前置说明

1.1 学习路径

阶段 内容
基础 RAG架构(Day8)
进阶 知识库构建

1.2 读者需要的基础

  • Python基础:文件操作、正则表达式
  • RAG概念:知道文档分块和向量化
  • LLM调用:会调用Embedding模型

1.3 学习目标

学完本文,你将能够:

  • 构建完整的企业知识库
  • 实现文档清洗和预处理
  • 设计合理的分块策略
  • 提取和管理元数据

二、知识库构建流程

2.1 完整流程

知识库构建
图1:知识库构建完整流程
  1. 文档采集:从各种来源获取文档
  2. 文档清洗:去除噪声、格式化
  3. 内容提取:提取文本、表格、图片
  4. 分块处理:切分成适合检索的块
  5. 向量化:生成向量存入数据库
  6. 质量验证:确保检索效果

2.2 数据源类型

类型 格式 难度
文本文件 TXT、Markdown
办公文档 Word、Excel、PPT ⭐⭐
PDF 报告、合同、手册 ⭐⭐⭐
网页 公司内网、Wiki ⭐⭐⭐
数据库 SQL、NoSQL ⭐⭐⭐⭐

三、环境配置

3.1 安装依赖

# 文档处理
pip install pypdf2 python-docx openpyxl pptx

# 网页处理
pip install requests beautifulsoup4 selenium

# 文本处理
pip install nltk spacy

# 向量化
pip install sentence-transformers chromadb

# 数据处理
pip install pandas numpy

四、文档采集

4.1 本地文件扫描

import os
from pathlib import Path
from typing import List, Dict

class DocumentScanner:
    """文档扫描器:遍历目录,收集所有文档
    
    支持的格式:
    - .txt, .md (文本)
    - .pdf (PDF)
    - .doc, .docx (Word)
    - .xls, .xlsx (Excel)
    - .ppt, .pptx (PPT)
    """
    
    def __init__(self, root_dir: str):
        self.root_dir = Path(root_dir)
        self.supported_formats = {
            '.txt': 'text',
            '.md': 'markdown',
            '.pdf': 'pdf',
            '.doc': 'word',
            '.docx': 'word',
            '.xls': 'excel',
            '.xlsx': 'excel',
            '.ppt': 'ppt',
            '.pptx': 'ppt',
        }
    
    def scan(self, recursive: bool = True) -> List[Dict]:
        """扫描目录,返回所有文档路径
        
        参数:
            recursive: 是否递归扫描子目录
        返回:
            文档信息列表
        """
        documents = []
        
        if recursive:
            pattern = "**/*"
        else:
            pattern = "*"
        
        for file_path in self.root_dir.glob(pattern):
            if file_path.is_file():
                ext = file_path.suffix.lower()
                if ext in self.supported_formats:
                    doc_info = {
                        'path': str(file_path),
                        'name': file_path.name,
                        'format': self.supported_formats[ext],
                        'size': file_path.stat().st_size,
                        'modified': file_path.stat().st_mtime,
                    }
                    documents.append(doc_info)
        
        print(f"📂 扫描完成:找到 {len(documents)} 个文档")
        return documents
    
    def filter_by_format(self, documents: List[Dict], formats: List[str]) -> List[Dict]:
        """按格式筛选文档"""
        filtered = [doc for doc in documents if doc['format'] in formats]
        print(f"🔍 筛选{filters}后:{len(filtered)} 个文档")
        return filtered
    
    def filter_by_size(self, documents: List[Dict], min_size: int = 0, max_size: int = None) -> List[Dict]:
        """按大小筛选(字节)"""
        filtered = [doc for doc in documents if doc['size'] >= min_size]
        if max_size:
            filtered = [doc for doc in filtered if doc['size'] <= max_size]
        print(f"📏 按大小筛选后:{len(filtered)} 个文档")
        return filtered

# 使用
if __name__ == "__main__":
    scanner = DocumentScanner("./documents")
    
    # 扫描
    docs = scanner.scan(recursive=True)
    
    # 筛选
    docs = scanner.filter_by_format(docs, ['pdf', 'word'])
    docs = scanner.filter_by_size(docs, min_size=1024)  # 大于1KB
    
    print(f"\n最终:{len(docs)} 个文档待处理")

4.2 网页内容采集

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from typing import List, Dict
import time

class WebScraper:
    """网页内容采集器
    
    从网站批量采集内容
    """
    
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (compatible; KnowledgeBot/1.0)'
        })
    
    def fetch_page(self, url: str, timeout: int = 10) -> str:
        """获取单个页面内容"""
        try:
            response = self.session.get(url, timeout=timeout)
            response.raise_for_status()
            return response.text
        except Exception as e:
            print(f"⚠️ 获取失败 {url}: {e}")
            return ""
    
    def extract_text(self, html: str) -> str:
        """从HTML提取正文"""
        soup = BeautifulSoup(html, 'html.parser')
        
        # 移除脚本和样式
        for tag in soup(['script', 'style', 'nav', 'footer', 'header']):
            tag.decompose()
        
        # 提取正文(尝试常见容器)
        main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content')
        
        if main_content:
            text = main_content.get_text(separator='\n', strip=True)
        else:
            text = soup.get_text(separator='\n', strip=True)
        
        # 清理空行
        lines = [line.strip() for line in text.split('\n') if line.strip()]
        return '\n'.join(lines)
    
    def crawl(self, urls: List[str], delay: float = 1.0) -> List[Dict]:
        """批量采集页面
        
        参数:
            urls: 页面URL列表
            delay: 请求间隔(秒)
        """
        results = []
        
        for i, url in enumerate(urls, 1):
            print(f"[{i}/{len(urls)}] 采集: {url}")
            
            html = self.fetch_page(url)
            if html:
                text = self.extract_text(html)
                results.append({
                    'url': url,
                    'content': text,
                    'length': len(text)
                })
            
            # 礼貌爬取:添加延迟
            if i < len(urls):
                time.sleep(delay)
        
        print(f"✅ 采集完成:{len(results)} 个页面")
        return results
    
    def extract_links(self, html: str, base_url: str) -> List[str]:
        """从页面提取链接"""
        soup = BeautifulSoup(html, 'html.parser')
        links = set()
        
        for a in soup.find_all('a', href=True):
            href = a['href']
            full_url = urljoin(base_url, href)
            
            # 只保留同域名的链接
            if urlparse(full_url).netloc == urlparse(base_url).netloc:
                links.add(full_url)
        
        return list(links)

# 使用
if __name__ == "__main__":
    scraper = WebScraper("https://docs.example.com")
    
    urls = [
        "https://docs.example.com/guide/intro",
        "https://docs.example.com/guide/quickstart",
        "https://docs.example.com/api/reference",
    ]
    
    pages = scraper.crawl(urls)
    for page in pages:
        print(f"\n📄 {page['url']} ({page['length']} 字)")
        print(page['content'][:200] + "...")

五、文档清洗

5.1 文本清洗

文档清洗
图2:文档清洗流程
import re
from typing import List

class TextCleaner:
    """文本清洗器
    
    清理各种噪声,还原干净文本
    """
    
    def __init__(self):
        # 匹配URL
        self.url_pattern = re.compile(r'https?://\S+')
        
        # 匹配邮箱
        self.email_pattern = re.compile(r'\S+@\S+\.\S+')
        
        # 匹配多余空白
        self.whitespace_pattern = re.compile(r'\s+')
        
        # 匹配特殊字符(保留中文、英文、数字、常用标点)
        self.special_chars_pattern = re.compile(r'[^\u4e00-\u9fa5a-zA-Z0-9\s,。!?;:、]")
    
    def remove_urls(self, text: str) -> str:
        """移除URL"""
        return self.url_pattern.sub('[链接]', text)
    
    def remove_emails(self, text: str) -> str:
        """移除邮箱"""
        return self.email_pattern.sub('[邮箱]', text)
    
    def normalize_whitespace(self, text: str) -> str:
        """标准化空白字符"""
        # 将多个空格/换行合并为一个
        text = self.whitespace_pattern.sub(' ', text)
        # 去除首尾空白
        return text.strip()
    
    def remove_special_chars(self, text: str) -> str:
        """移除特殊字符(可选)"""
        return self.special_chars_pattern.sub('', text)
    
    def clean(self, text: str, remove_urls: bool = True, remove_emails: bool = True) -> str:
        """完整清洗流程"""
        if remove_urls:
            text = self.remove_urls(text)
        
        if remove_emails:
            text = self.remove_emails(text)
        
        text = self.normalize_whitespace(text)
        
        return text
    
    def clean_batch(self, texts: List[str]) -> List[str]:
        """批量清洗"""
        return [self.clean(text) for text in texts]

# 使用
if __name__ == "__main__":
    dirty_text = """
    欢迎访问我们的网站!https://example.com
    
    如有疑问请联系:support@example.com
    
    这是一个测试   文本,包含   多个空格。
    
    还有很多特殊字符 @#$%^
    """
    
    cleaner = TextCleaner()
    clean_text = cleaner.clean(dirty_text)
    
    print("清洗前:")
    print(dirty_text)
    print("\n清洗后:")
    print(clean_text)

5.2 PDF内容提取

import PyPDF2
from typing import List, Dict, Optional

class PDFExtractor:
    """PDF内容提取器
    
    支持:
    - 文本提取
    - 逐页读取
    - 元数据提取
    """
    
    def __init__(self, file_path: str):
        self.file_path = file_path
        self.reader = None
        self._open()
    
    def _open(self):
        """打开PDF文件"""
        try:
            self.reader = PyPDF2.PdfReader(self.file_path)
            print(f"✅ PDF打开成功:{len(self.reader.pages)} 页")
        except Exception as e:
            print(f"❌ PDF打开失败: {e}")
            self.reader = None
    
    def get_metadata(self) -> Optional[Dict]:
        """提取元数据"""
        if not self.reader:
            return None
        
        metadata = self.reader.metadata
        
        return {
            'title': metadata.get('/Title', ''),
            'author': metadata.get('/Author', ''),
            'subject': metadata.get('/Subject', ''),
            'creator': metadata.get('/Creator', ''),
            'pages': len(self.reader.pages)
        }
    
    def extract_text(self, page_numbers: List[int] = None) -> str:
        """提取文本
        
        参数:
            page_numbers: 指定页码列表,None表示全部
        """
        if not self.reader:
            return ""
        
        if page_numbers is None:
            page_numbers = range(len(self.reader.pages))
        
        texts = []
        
        for i in page_numbers:
            if i < len(self.reader.pages):
                page = self.reader.pages[i]
                text = page.extract_text()
                texts.append(f"[第{i+1}页]\n{text}")
        
        return "\n\n".join(texts)
    
    def extract_by_chapter(self, toc: List[Dict] = None) -> List[Dict]:
        """按章节提取(需要提供目录)
        
        toc格式: [{'title': '章节名', 'start_page': 1, 'end_page': 5}]
        """
        if not toc:
            # 没有目录,按页分割
            return [{'title': f'第{i+1}部分', 'content': self.extract_text([i])}
                    for i in range(len(self.reader.pages))]
        
        chapters = []
        
        for chapter in toc:
            content = self.extract_text(range(chapter['start_page'], chapter['end_page']+1))
            chapters.append({
                'title': chapter['title'],
                'content': content,
                'pages': f"{chapter['start_page']}-{chapter['end_page']}"
            })
        
        return chapters

# 使用
if __name__ == "__main__":
    extractor = PDFExtractor("sample.pdf")
    
    # 获取元数据
    metadata = extractor.get_metadata()
    print(f"\n📋 元数据:{metadata}")
    
    # 提取全部文本
    full_text = extractor.extract_text()
    print(f"\n📄 全文长度:{len(full_text)} 字符")
    
    # 提取前3页
    first_pages = extractor.extract_text([0, 1, 2])
    print(f"\n📄 前3页预览:\n{first_pages[:500]}...")

六、分块策略

6.1 分块的重要性

分块是RAG效果的关键。太长会引入噪声,太短会丢失上下文。

影响分块效果的因素

  • 块大小:影响检索精度和上下文完整性
  • 重叠度:保持块之间上下文连贯
  • 边界处理:在句子/段落边界切分

6.2 固定大小分块

from typing import List, Dict

class FixedSizeChunker:
    """固定大小分块器
    
    最简单的分块策略
    """
    
    def __init__(self, chunk_size: int = 500, overlap: int = 50):
        """
        参数:
            chunk_size: 块大小(字符数)
            overlap: 重叠大小(字符数)
        """
        self.chunk_size = chunk_size
        self.overlap = overlap
    
    def chunk(self, text: str, metadata: Dict = None) -> List[Dict]:
        """分块
        
        参数:
            text: 输入文本
            metadata: 附加元数据
        返回:
            块列表
        """
        chunks = []
        start = 0
        chunk_num = 0
        
        while start < len(text):
            end = start + self.chunk_size
            chunk_text = text[start:end]
            
            chunks.append({
                'content': chunk_text,
                'chunk_id': f"chunk_{chunk_num}",
                'start': start,
                'end': end,
                'metadata': metadata or {}
            })
            
            # 滑动窗口
            start += self.chunk_size - self.overlap
            chunk_num += 1
        
        print(f"📦 分块完成:{len(chunks)} 个块")
        return chunks

# 使用
if __name__ == "__main__":
    chunker = FixedSizeChunker(chunk_size=200, overlap=30)
    
    text = """
    人工智能是当今最热门的技术领域之一。它包括机器学习、深度学习等多个分支。
    机器学习是AI的核心技术,通过数据训练模型。深度学习使用神经网络,在图像识别、自然语言处理等领域取得突破性进展。
    Python是AI领域最常用的编程语言,拥有丰富的库和框架。
    """
    
    chunks = chunker.chunk(text, metadata={'source': 'demo.txt'})
    
    for chunk in chunks:
        print(f"\n【{chunk['chunk_id']}】{chunk['start']}-{chunk['end']}")
        print(chunk['content'])

6.3 语义分块

import re
from typing import List, Dict

class SemanticChunker:
    """语义分块器
    
    按段落/句子边界分块,更符合语义
    """
    
    def __init__(self, min_chunk_size: int = 100, max_chunk_size: int = 1000):
        """
        参数:
            min_chunk_size: 最小块大小(太小的段落会合并)
            max_chunk_size: 最大块大小(太大的段落会分割)
        """
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
    
    def split_paragraphs(self, text: str) -> List[str]:
        """按段落分割"""
        # 按换行分割
        paragraphs = re.split(r'\n\s*\n', text)
        # 清理空白
        paragraphs = [p.strip() for p in paragraphs if p.strip()]
        return paragraphs
    
    def merge_small_paragraphs(self, paragraphs: List[str]) -> List[str]:
        """合并太小的段落"""
        merged = []
        buffer = ""
        
        for para in paragraphs:
            if len(buffer) + len(para) < self.max_chunk_size:
                buffer += "\n\n" + para
            else:
                if buffer:
                    merged.append(buffer.strip())
                # 检查当前段落是否太大
                if len(para) > self.max_chunk_size:
                    # 进一步分割
                    sub_chunks = self.split_large_paragraph(para)
                    merged.extend(sub_chunks[:-1])
                    buffer = sub_chunks[-1]
                else:
                    buffer = para
        
        if buffer:
            merged.append(buffer.strip())
        
        return merged
    
    def split_large_paragraph(self, text: str) -> List[str]:
        """分割过长的段落"""
        # 按句子分割
        sentences = re.split(r'[。!?.!?]', text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        chunks = []
        current_chunk = ""
        
        for sentence in sentences:
            if len(current_chunk) + len(sentence) < self.max_chunk_size:
                current_chunk += sentence + "。"
            else:
                if current_chunk:
                    chunks.append(current_chunk)
                current_chunk = sentence + "。"
        
        if current_chunk:
            chunks.append(current_chunk)
        
        return chunks
    
    def chunk(self, text: str, metadata: Dict = None) -> List[Dict]:
        """语义分块"""
        # 1. 分割段落
        paragraphs = self.split_paragraphs(text)
        print(f"📄 分割出 {len(paragraphs)} 个段落")
        
        # 2. 合并小段落
        merged = self.merge_small_paragraphs(paragraphs)
        print(f"📄 合并后 {len(merged)} 个块")
        
        # 3. 构建块对象
        chunks = []
        for i, content in enumerate(merged):
            if len(content) >= self.min_chunk_size:  # 过滤太短的
                chunks.append({
                    'content': content,
                    'chunk_id': f"chunk_{i}",
                    'char_count': len(content),
                    'metadata': metadata or {}
                })
        
        print(f"📦 最终 {len(chunks)} 个块")
        return chunks

# 使用
if __name__ == "__main__":
    chunker = SemanticChunker(min_chunk_size=50, max_chunk_size=300)
    
    text = """
    人工智能是当今最热门的技术领域。

    机器学习是AI的核心技术。通过数据训练模型,让计算机学会预测和决策。深度学习是机器学习的分支,使用神经网络。

    Python是AI领域最常用的编程语言。它有丰富的库,如TensorFlow、PyTorch。JavaScript也在AI可视化中有应用。

    数据显示,AI市场规模逐年增长。
    """
    
    chunks = chunker.chunk(text, metadata={'source': 'ai_intro.txt'})
    
    for chunk in chunks:
        print(f"\n【{chunk['chunk_id']}】({chunk['char_count']}字)")
        print(chunk['content'][:100] + "...")

七、元数据管理

7.1 元数据的重要性

元数据
图3:元数据结构

元数据让检索更精准,支持过滤和排序。

常用元数据

  • 来源:文件名、URL、作者
  • 时间:创建时间、更新时间
  • 类型:文档类型、格式
  • 标签:分类、关键词
  • 权限:公开、内部、机密

7.2 元数据提取器

from datetime import datetime
from pathlib import Path
import hashlib

class MetadataExtractor:
    """元数据提取器
    
    从文件和内容中提取元数据
    """
    
    def extract_from_file(self, file_path: str) -> dict:
        """从文件提取元数据"""
        path = Path(file_path)
        
        metadata = {
            'file_name': path.name,
            'file_type': path.suffix.lower(),
            'file_size': path.stat().st_size,
            'created_at': datetime.fromtimestamp(path.stat().st_ctime).isoformat(),
            'modified_at': datetime.fromtimestamp(path.stat().st_mtime).isoformat(),
        }
        
        return metadata
    
    def extract_from_content(self, content: str, file_path: str = None) -> dict:
        """从内容提取元数据"""
        metadata = {
            'char_count': len(content),
            'word_count': len(content.split()),
            'line_count': content.count('\n') + 1,
            'content_hash': hashlib.md5(content.encode()).hexdigest(),
        }
        
        # 估算阅读时间(每分钟400字)
        reading_time = len(content) / 400 / 60
        metadata['reading_time_minutes'] = round(reading_time, 1)
        
        return metadata
    
    def extract_keywords(self, content: str, top_n: int = 5) -> List[str]:
        """提取关键词(简单实现)"""
        # 停用词
        stopwords = {'的', '了', '是', '在', '和', '与', '为', '对', '等', '于', '之', '以', '及'}
        
        # 提取中文词(简单按字符)
        words = re.findall(r'[\u4e00-\u9fa5]{2,}', content)
        
        # 统计词频
        word_freq = {}
        for word in words:
            if word not in stopwords and len(word) >= 2:
                word_freq[word] = word_freq.get(word, 0) + 1
        
        # 排序取top
        sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
        return [word for word, freq in sorted_words[:top_n]]
    
    def extract(self, content: str, file_path: str = None) -> dict:
        """完整提取"""
        metadata = {}
        
        if file_path:
            metadata.update(self.extract_from_file(file_path))
        
        metadata.update(self.extract_from_content(content, file_path))
        metadata['keywords'] = self.extract_keywords(content)
        
        return metadata

# 使用
if __name__ == "__main__":
    extractor = MetadataExtractor()
    
    content = """
    人工智能是当今最热门的技术领域之一。机器学习是AI的核心技术。
    深度学习使用神经网络,在图像识别和自然语言处理中应用广泛。
    Python是最常用的编程语言,拥有丰富的AI库。
    """
    
    metadata = extractor.extract(content, "ai_intro.txt")
    
    print("📋 元数据:")
    for key, value in metadata.items():
        print(f"  {key}: {value}")

八、完整实战:企业知识库构建

8.1 系统架构

"""
企业知识库构建系统

完整流程:
1. 扫描文档
2. 提取内容
3. 清洗文本
4. 分块处理
5. 提取元数据
6. 向量化存储
7. 质量验证
"""

import os
from typing import List, Dict

class EnterpriseKnowledgeBase:
    """企业知识库构建器"""
    
    def __init__(self, source_dir: str):
        self.source_dir = source_dir
        self.documents = []
        self.chunks = []
        
        # 各组件
        self.scanner = DocumentScanner(source_dir)
        self.cleaner = TextCleaner()
        self.chunker = SemanticChunker(min_chunk_size=100, max_chunk_size=500)
        self.metadata_extractor = MetadataExtractor()
    
    def ingest(self) -> int:
        """摄入所有文档
        
        返回:
            生成的块数量
        """
        print("=" * 60)
        print("🚀 开始构建知识库")
        print("=" * 60)
        
        # 1. 扫描文档
        print("\n📂 步骤1:扫描文档")
        docs = self.scanner.scan(recursive=True)
        print(f"   找到 {len(docs)} 个文档")
        
        # 2-5. 处理每个文档
        print("\n📝 步骤2-5:处理文档")
        all_chunks = []
        
        for i, doc_info in enumerate(docs, 1):
            try:
                # 读取内容(根据格式选择提取器)
                content = self._extract_content(doc_info)
                
                if not content:
                    continue
                
                # 清洗
                content = self.cleaner.clean(content)
                
                # 分块
                metadata = self.metadata_extractor.extract(content, doc_info['path'])
                chunks = self.chunker.chunk(content, metadata)
                
                all_chunks.extend(chunks)
                
                print(f"   [{i}/{len(docs)}] {doc_info['name']} → {len(chunks)} 块")
                
            except Exception as e:
                print(f"   ⚠️ 处理失败 {doc_info['name']}: {e}")
        
        self.chunks = all_chunks
        print(f"\n✅ 知识库构建完成:{len(self.chunks)} 个块")
        
        return len(self.chunks)
    
    def _extract_content(self, doc_info: Dict) -> str:
        """根据格式提取内容"""
        path = doc_info['path']
        format_type = doc_info['format']
        
        if format_type == 'text':
            with open(path, 'r', encoding='utf-8', errors='ignore') as f:
                return f.read()
        
        elif format_type == 'pdf':
            extractor = PDFExtractor(path)
            return extractor.extract_text()
        
        elif format_type == 'markdown':
            with open(path, 'r', encoding='utf-8', errors='ignore') as f:
                return f.read()
        
        # 其他格式暂不支持
        return ""
    
    def search(self, query: str, top_k: int = 5) -> List[Dict]:
        """检索(简单实现)"""
        # 这里应该用向量检索
        # 简化实现:关键词匹配
        results = []
        
        for chunk in self.chunks:
            content = chunk['content'].lower()
            if any(word in content for word in query.lower().split()):
                results.append(chunk)
                if len(results) >= top_k:
                    break
        
        return results
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        return {
            'total_chunks': len(self.chunks),
            'total_chars': sum(len(c['content']) for c in self.chunks),
            'avg_chunk_size': sum(len(c['content']) for c in self.chunks) / len(self.chunks) if self.chunks else 0,
        }

# 使用
if __name__ == "__main__":
    # 创建测试目录和文档
    os.makedirs("test_kb", exist_ok=True)
    
    with open("test_kb/AI简介.txt", "w") as f:
        f.write("人工智能是热门技术。机器学习是AI核心。深度学习使用神经网络。")
    
    with open("test_kb/Python教程.txt", "w") as f:
        f.write("Python是编程语言。Python易学易用。Python广泛用于AI领域。")
    
    # 构建知识库
    kb = EnterpriseKnowledgeBase("test_kb")
    kb.ingest()
    
    # 统计
    stats = kb.get_stats()
    print(f"\n📊 统计:{stats}")
    
    # 检索
    results = kb.search("Python")
    print(f"\n🔍 检索'Python'结果:")
    for r in results:
        print(f"  - {r['content'][:50]}...")

九、质量控制

9.1 质量检查清单

class QualityChecker:
    """质量检查器
    
    检查知识库的各类质量问题
    """
    
    def __init__(self):
        self.issues = []
    
    def check_chunk_size(self, chunks: List[Dict], min_size: int = 50, max_size: int = 1000):
        """检查块大小"""
        too_small = [c for c in chunks if len(c['content']) < min_size]
        too_large = [c for c in chunks if len(c['content']) > max_size]
        
        if too_small:
            print(f"⚠️ {len(too_small)} 个块太小(<{min_size}字)")
        
        if too_large:
            print(f"⚠️ {len(too_large)} 个块太大(>{max_size}字)")
        
        return {
            'too_small': len(too_small),
            'too_large': len(too_large)
        }
    
    def check_duplicates(self, chunks: List[Dict]):
        """检查重复内容"""
        hashes = {}
        duplicates = []
        
        for chunk in chunks:
            content_hash = hashlib.md5(chunk['content'].encode()).hexdigest()
            
            if content_hash in hashes:
                duplicates.append({
                    'chunk_id': chunk['chunk_id'],
                    'duplicate_of': hashes[content_hash]
                })
            else:
                hashes[content_hash] = chunk['chunk_id']
        
        if duplicates:
            print(f"⚠️ 发现 {len(duplicates)} 对重复块")
        
        return duplicates
    
    def check_coverage(self, chunks: List[Dict], sample_queries: List[str]) -> Dict:
        """检查覆盖率
        
        用测试查询检查检索效果
        """
        coverage = {}
        
        for query in sample_queries:
            query_words = set(query.lower().split())
            
            matched = 0
            for chunk in chunks:
                content_words = set(chunk['content'].lower().split())
                if query_words & content_words:  # 有交集
                    matched += 1
            
            coverage[query] = matched / len(chunks) if chunks else 0
        
        return coverage
    
    def run_all_checks(self, chunks: List[Dict]):
        """运行所有检查"""
        print("\n🔍 质量检查:")
        
        size_issues = self.check_chunk_size(chunks)
        dupes = self.check_duplicates(chunks)
        
        # 覆盖率检查(示例查询)
        sample_queries = ["人工智能", "机器学习", "Python"]
        coverage = self.check_coverage(chunks, sample_queries)
        
        print("\n📊 覆盖率:")
        for query, rate in coverage.items():
            print(f"  '{query}': {rate*100:.1f}%")
        
        return {
            'size_issues': size_issues,
            'duplicates': len(dupes),
            'coverage': coverage
        }

十、总结与练习

10.1 要点回顾

  • 文档采集:多格式支持,批量处理
  • 文档清洗:去噪、规范化
  • 分块策略:固定大小 vs 语义分块
  • 元数据:来源、时间、关键词
  • 质量控制:大小、重复、覆盖率

10.2 分块策略选择

场景 推荐策略 块大小
FAQ 按问题分割 整个问题
长文档 语义分块 300-500字
代码文档 固定+函数边界 200-400字

10.3 延伸阅读

  • LangChain Document Loaders官方文档
  • Unstructured库:支持100+种文档格式

10.4 课后练习

基础题:实现一个PDF文档的知识库构建流程。

进阶题:设计一个支持增量更新的知识库系统。

挑战题:实现自动分块策略选择——根据文档类型自动选择最优分块方式。

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

微信公众号二维码

扫码关注公众号

QQ
QQ二维码

扫码添加QQ