【高级应用】Day10:知识库构建与数据处理–打造高质量知识库的完整流程
章节导语
RAG效果好不好,60%取决于知识库质量。Garbage in, Garbage out——低质量的文档,再好的检索算法也没用。
知识库构建不只是把文档扔进去,而是:文档清洗、分块策略、元数据提取、质量控制的一套完整流程。
本文系统讲解企业知识库的完整构建流程,每步都有实战代码。
一、前置说明
1.1 学习路径
| 阶段 | 内容 |
|---|---|
| 基础 | RAG架构(Day8) |
| 进阶 | 知识库构建 |
1.2 读者需要的基础
- Python基础:文件操作、正则表达式
- RAG概念:知道文档分块和向量化
- LLM调用:会调用Embedding模型
1.3 学习目标
学完本文,你将能够:
- 构建完整的企业知识库
- 实现文档清洗和预处理
- 设计合理的分块策略
- 提取和管理元数据
二、知识库构建流程
2.1 完整流程
图1:知识库构建完整流程- 文档采集:从各种来源获取文档
- 文档清洗:去除噪声、格式化
- 内容提取:提取文本、表格、图片
- 分块处理:切分成适合检索的块
- 向量化:生成向量存入数据库
- 质量验证:确保检索效果
2.2 数据源类型
| 类型 | 格式 | 难度 |
|---|---|---|
| 文本文件 | TXT、Markdown | ⭐ |
| 办公文档 | Word、Excel、PPT | ⭐⭐ |
| 报告、合同、手册 | ⭐⭐⭐ | |
| 网页 | 公司内网、Wiki | ⭐⭐⭐ |
| 数据库 | SQL、NoSQL | ⭐⭐⭐⭐ |
三、环境配置
3.1 安装依赖
# 文档处理
pip install pypdf2 python-docx openpyxl pptx
# 网页处理
pip install requests beautifulsoup4 selenium
# 文本处理
pip install nltk spacy
# 向量化
pip install sentence-transformers chromadb
# 数据处理
pip install pandas numpy
四、文档采集
4.1 本地文件扫描
import os
from pathlib import Path
from typing import List, Dict
class DocumentScanner:
"""文档扫描器:遍历目录,收集所有文档
支持的格式:
- .txt, .md (文本)
- .pdf (PDF)
- .doc, .docx (Word)
- .xls, .xlsx (Excel)
- .ppt, .pptx (PPT)
"""
def __init__(self, root_dir: str):
self.root_dir = Path(root_dir)
self.supported_formats = {
'.txt': 'text',
'.md': 'markdown',
'.pdf': 'pdf',
'.doc': 'word',
'.docx': 'word',
'.xls': 'excel',
'.xlsx': 'excel',
'.ppt': 'ppt',
'.pptx': 'ppt',
}
def scan(self, recursive: bool = True) -> List[Dict]:
"""扫描目录,返回所有文档路径
参数:
recursive: 是否递归扫描子目录
返回:
文档信息列表
"""
documents = []
if recursive:
pattern = "**/*"
else:
pattern = "*"
for file_path in self.root_dir.glob(pattern):
if file_path.is_file():
ext = file_path.suffix.lower()
if ext in self.supported_formats:
doc_info = {
'path': str(file_path),
'name': file_path.name,
'format': self.supported_formats[ext],
'size': file_path.stat().st_size,
'modified': file_path.stat().st_mtime,
}
documents.append(doc_info)
print(f"📂 扫描完成:找到 {len(documents)} 个文档")
return documents
def filter_by_format(self, documents: List[Dict], formats: List[str]) -> List[Dict]:
"""按格式筛选文档"""
filtered = [doc for doc in documents if doc['format'] in formats]
print(f"🔍 筛选{filters}后:{len(filtered)} 个文档")
return filtered
def filter_by_size(self, documents: List[Dict], min_size: int = 0, max_size: int = None) -> List[Dict]:
"""按大小筛选(字节)"""
filtered = [doc for doc in documents if doc['size'] >= min_size]
if max_size:
filtered = [doc for doc in filtered if doc['size'] <= max_size]
print(f"📏 按大小筛选后:{len(filtered)} 个文档")
return filtered
# 使用
if __name__ == "__main__":
scanner = DocumentScanner("./documents")
# 扫描
docs = scanner.scan(recursive=True)
# 筛选
docs = scanner.filter_by_format(docs, ['pdf', 'word'])
docs = scanner.filter_by_size(docs, min_size=1024) # 大于1KB
print(f"\n最终:{len(docs)} 个文档待处理")
4.2 网页内容采集
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from typing import List, Dict
import time
class WebScraper:
"""网页内容采集器
从网站批量采集内容
"""
def __init__(self, base_url: str):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; KnowledgeBot/1.0)'
})
def fetch_page(self, url: str, timeout: int = 10) -> str:
"""获取单个页面内容"""
try:
response = self.session.get(url, timeout=timeout)
response.raise_for_status()
return response.text
except Exception as e:
print(f"⚠️ 获取失败 {url}: {e}")
return ""
def extract_text(self, html: str) -> str:
"""从HTML提取正文"""
soup = BeautifulSoup(html, 'html.parser')
# 移除脚本和样式
for tag in soup(['script', 'style', 'nav', 'footer', 'header']):
tag.decompose()
# 提取正文(尝试常见容器)
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content')
if main_content:
text = main_content.get_text(separator='\n', strip=True)
else:
text = soup.get_text(separator='\n', strip=True)
# 清理空行
lines = [line.strip() for line in text.split('\n') if line.strip()]
return '\n'.join(lines)
def crawl(self, urls: List[str], delay: float = 1.0) -> List[Dict]:
"""批量采集页面
参数:
urls: 页面URL列表
delay: 请求间隔(秒)
"""
results = []
for i, url in enumerate(urls, 1):
print(f"[{i}/{len(urls)}] 采集: {url}")
html = self.fetch_page(url)
if html:
text = self.extract_text(html)
results.append({
'url': url,
'content': text,
'length': len(text)
})
# 礼貌爬取:添加延迟
if i < len(urls):
time.sleep(delay)
print(f"✅ 采集完成:{len(results)} 个页面")
return results
def extract_links(self, html: str, base_url: str) -> List[str]:
"""从页面提取链接"""
soup = BeautifulSoup(html, 'html.parser')
links = set()
for a in soup.find_all('a', href=True):
href = a['href']
full_url = urljoin(base_url, href)
# 只保留同域名的链接
if urlparse(full_url).netloc == urlparse(base_url).netloc:
links.add(full_url)
return list(links)
# 使用
if __name__ == "__main__":
scraper = WebScraper("https://docs.example.com")
urls = [
"https://docs.example.com/guide/intro",
"https://docs.example.com/guide/quickstart",
"https://docs.example.com/api/reference",
]
pages = scraper.crawl(urls)
for page in pages:
print(f"\n📄 {page['url']} ({page['length']} 字)")
print(page['content'][:200] + "...")
五、文档清洗
5.1 文本清洗
图2:文档清洗流程import re
from typing import List
class TextCleaner:
"""文本清洗器
清理各种噪声,还原干净文本
"""
def __init__(self):
# 匹配URL
self.url_pattern = re.compile(r'https?://\S+')
# 匹配邮箱
self.email_pattern = re.compile(r'\S+@\S+\.\S+')
# 匹配多余空白
self.whitespace_pattern = re.compile(r'\s+')
# 匹配特殊字符(保留中文、英文、数字、常用标点)
self.special_chars_pattern = re.compile(r'[^\u4e00-\u9fa5a-zA-Z0-9\s,。!?;:、]")
def remove_urls(self, text: str) -> str:
"""移除URL"""
return self.url_pattern.sub('[链接]', text)
def remove_emails(self, text: str) -> str:
"""移除邮箱"""
return self.email_pattern.sub('[邮箱]', text)
def normalize_whitespace(self, text: str) -> str:
"""标准化空白字符"""
# 将多个空格/换行合并为一个
text = self.whitespace_pattern.sub(' ', text)
# 去除首尾空白
return text.strip()
def remove_special_chars(self, text: str) -> str:
"""移除特殊字符(可选)"""
return self.special_chars_pattern.sub('', text)
def clean(self, text: str, remove_urls: bool = True, remove_emails: bool = True) -> str:
"""完整清洗流程"""
if remove_urls:
text = self.remove_urls(text)
if remove_emails:
text = self.remove_emails(text)
text = self.normalize_whitespace(text)
return text
def clean_batch(self, texts: List[str]) -> List[str]:
"""批量清洗"""
return [self.clean(text) for text in texts]
# 使用
if __name__ == "__main__":
dirty_text = """
欢迎访问我们的网站!https://example.com
如有疑问请联系:support@example.com
这是一个测试 文本,包含 多个空格。
还有很多特殊字符 @#$%^
"""
cleaner = TextCleaner()
clean_text = cleaner.clean(dirty_text)
print("清洗前:")
print(dirty_text)
print("\n清洗后:")
print(clean_text)
5.2 PDF内容提取
import PyPDF2
from typing import List, Dict, Optional
class PDFExtractor:
"""PDF内容提取器
支持:
- 文本提取
- 逐页读取
- 元数据提取
"""
def __init__(self, file_path: str):
self.file_path = file_path
self.reader = None
self._open()
def _open(self):
"""打开PDF文件"""
try:
self.reader = PyPDF2.PdfReader(self.file_path)
print(f"✅ PDF打开成功:{len(self.reader.pages)} 页")
except Exception as e:
print(f"❌ PDF打开失败: {e}")
self.reader = None
def get_metadata(self) -> Optional[Dict]:
"""提取元数据"""
if not self.reader:
return None
metadata = self.reader.metadata
return {
'title': metadata.get('/Title', ''),
'author': metadata.get('/Author', ''),
'subject': metadata.get('/Subject', ''),
'creator': metadata.get('/Creator', ''),
'pages': len(self.reader.pages)
}
def extract_text(self, page_numbers: List[int] = None) -> str:
"""提取文本
参数:
page_numbers: 指定页码列表,None表示全部
"""
if not self.reader:
return ""
if page_numbers is None:
page_numbers = range(len(self.reader.pages))
texts = []
for i in page_numbers:
if i < len(self.reader.pages):
page = self.reader.pages[i]
text = page.extract_text()
texts.append(f"[第{i+1}页]\n{text}")
return "\n\n".join(texts)
def extract_by_chapter(self, toc: List[Dict] = None) -> List[Dict]:
"""按章节提取(需要提供目录)
toc格式: [{'title': '章节名', 'start_page': 1, 'end_page': 5}]
"""
if not toc:
# 没有目录,按页分割
return [{'title': f'第{i+1}部分', 'content': self.extract_text([i])}
for i in range(len(self.reader.pages))]
chapters = []
for chapter in toc:
content = self.extract_text(range(chapter['start_page'], chapter['end_page']+1))
chapters.append({
'title': chapter['title'],
'content': content,
'pages': f"{chapter['start_page']}-{chapter['end_page']}"
})
return chapters
# 使用
if __name__ == "__main__":
extractor = PDFExtractor("sample.pdf")
# 获取元数据
metadata = extractor.get_metadata()
print(f"\n📋 元数据:{metadata}")
# 提取全部文本
full_text = extractor.extract_text()
print(f"\n📄 全文长度:{len(full_text)} 字符")
# 提取前3页
first_pages = extractor.extract_text([0, 1, 2])
print(f"\n📄 前3页预览:\n{first_pages[:500]}...")
六、分块策略
6.1 分块的重要性
分块是RAG效果的关键。太长会引入噪声,太短会丢失上下文。
影响分块效果的因素:
- 块大小:影响检索精度和上下文完整性
- 重叠度:保持块之间上下文连贯
- 边界处理:在句子/段落边界切分
6.2 固定大小分块
from typing import List, Dict
class FixedSizeChunker:
"""固定大小分块器
最简单的分块策略
"""
def __init__(self, chunk_size: int = 500, overlap: int = 50):
"""
参数:
chunk_size: 块大小(字符数)
overlap: 重叠大小(字符数)
"""
self.chunk_size = chunk_size
self.overlap = overlap
def chunk(self, text: str, metadata: Dict = None) -> List[Dict]:
"""分块
参数:
text: 输入文本
metadata: 附加元数据
返回:
块列表
"""
chunks = []
start = 0
chunk_num = 0
while start < len(text):
end = start + self.chunk_size
chunk_text = text[start:end]
chunks.append({
'content': chunk_text,
'chunk_id': f"chunk_{chunk_num}",
'start': start,
'end': end,
'metadata': metadata or {}
})
# 滑动窗口
start += self.chunk_size - self.overlap
chunk_num += 1
print(f"📦 分块完成:{len(chunks)} 个块")
return chunks
# 使用
if __name__ == "__main__":
chunker = FixedSizeChunker(chunk_size=200, overlap=30)
text = """
人工智能是当今最热门的技术领域之一。它包括机器学习、深度学习等多个分支。
机器学习是AI的核心技术,通过数据训练模型。深度学习使用神经网络,在图像识别、自然语言处理等领域取得突破性进展。
Python是AI领域最常用的编程语言,拥有丰富的库和框架。
"""
chunks = chunker.chunk(text, metadata={'source': 'demo.txt'})
for chunk in chunks:
print(f"\n【{chunk['chunk_id']}】{chunk['start']}-{chunk['end']}")
print(chunk['content'])
6.3 语义分块
import re
from typing import List, Dict
class SemanticChunker:
"""语义分块器
按段落/句子边界分块,更符合语义
"""
def __init__(self, min_chunk_size: int = 100, max_chunk_size: int = 1000):
"""
参数:
min_chunk_size: 最小块大小(太小的段落会合并)
max_chunk_size: 最大块大小(太大的段落会分割)
"""
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size
def split_paragraphs(self, text: str) -> List[str]:
"""按段落分割"""
# 按换行分割
paragraphs = re.split(r'\n\s*\n', text)
# 清理空白
paragraphs = [p.strip() for p in paragraphs if p.strip()]
return paragraphs
def merge_small_paragraphs(self, paragraphs: List[str]) -> List[str]:
"""合并太小的段落"""
merged = []
buffer = ""
for para in paragraphs:
if len(buffer) + len(para) < self.max_chunk_size:
buffer += "\n\n" + para
else:
if buffer:
merged.append(buffer.strip())
# 检查当前段落是否太大
if len(para) > self.max_chunk_size:
# 进一步分割
sub_chunks = self.split_large_paragraph(para)
merged.extend(sub_chunks[:-1])
buffer = sub_chunks[-1]
else:
buffer = para
if buffer:
merged.append(buffer.strip())
return merged
def split_large_paragraph(self, text: str) -> List[str]:
"""分割过长的段落"""
# 按句子分割
sentences = re.split(r'[。!?.!?]', text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < self.max_chunk_size:
current_chunk += sentence + "。"
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence + "。"
if current_chunk:
chunks.append(current_chunk)
return chunks
def chunk(self, text: str, metadata: Dict = None) -> List[Dict]:
"""语义分块"""
# 1. 分割段落
paragraphs = self.split_paragraphs(text)
print(f"📄 分割出 {len(paragraphs)} 个段落")
# 2. 合并小段落
merged = self.merge_small_paragraphs(paragraphs)
print(f"📄 合并后 {len(merged)} 个块")
# 3. 构建块对象
chunks = []
for i, content in enumerate(merged):
if len(content) >= self.min_chunk_size: # 过滤太短的
chunks.append({
'content': content,
'chunk_id': f"chunk_{i}",
'char_count': len(content),
'metadata': metadata or {}
})
print(f"📦 最终 {len(chunks)} 个块")
return chunks
# 使用
if __name__ == "__main__":
chunker = SemanticChunker(min_chunk_size=50, max_chunk_size=300)
text = """
人工智能是当今最热门的技术领域。
机器学习是AI的核心技术。通过数据训练模型,让计算机学会预测和决策。深度学习是机器学习的分支,使用神经网络。
Python是AI领域最常用的编程语言。它有丰富的库,如TensorFlow、PyTorch。JavaScript也在AI可视化中有应用。
数据显示,AI市场规模逐年增长。
"""
chunks = chunker.chunk(text, metadata={'source': 'ai_intro.txt'})
for chunk in chunks:
print(f"\n【{chunk['chunk_id']}】({chunk['char_count']}字)")
print(chunk['content'][:100] + "...")
七、元数据管理
7.1 元数据的重要性
图3:元数据结构元数据让检索更精准,支持过滤和排序。
常用元数据:
- 来源:文件名、URL、作者
- 时间:创建时间、更新时间
- 类型:文档类型、格式
- 标签:分类、关键词
- 权限:公开、内部、机密
7.2 元数据提取器
from datetime import datetime
from pathlib import Path
import hashlib
class MetadataExtractor:
"""元数据提取器
从文件和内容中提取元数据
"""
def extract_from_file(self, file_path: str) -> dict:
"""从文件提取元数据"""
path = Path(file_path)
metadata = {
'file_name': path.name,
'file_type': path.suffix.lower(),
'file_size': path.stat().st_size,
'created_at': datetime.fromtimestamp(path.stat().st_ctime).isoformat(),
'modified_at': datetime.fromtimestamp(path.stat().st_mtime).isoformat(),
}
return metadata
def extract_from_content(self, content: str, file_path: str = None) -> dict:
"""从内容提取元数据"""
metadata = {
'char_count': len(content),
'word_count': len(content.split()),
'line_count': content.count('\n') + 1,
'content_hash': hashlib.md5(content.encode()).hexdigest(),
}
# 估算阅读时间(每分钟400字)
reading_time = len(content) / 400 / 60
metadata['reading_time_minutes'] = round(reading_time, 1)
return metadata
def extract_keywords(self, content: str, top_n: int = 5) -> List[str]:
"""提取关键词(简单实现)"""
# 停用词
stopwords = {'的', '了', '是', '在', '和', '与', '为', '对', '等', '于', '之', '以', '及'}
# 提取中文词(简单按字符)
words = re.findall(r'[\u4e00-\u9fa5]{2,}', content)
# 统计词频
word_freq = {}
for word in words:
if word not in stopwords and len(word) >= 2:
word_freq[word] = word_freq.get(word, 0) + 1
# 排序取top
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
return [word for word, freq in sorted_words[:top_n]]
def extract(self, content: str, file_path: str = None) -> dict:
"""完整提取"""
metadata = {}
if file_path:
metadata.update(self.extract_from_file(file_path))
metadata.update(self.extract_from_content(content, file_path))
metadata['keywords'] = self.extract_keywords(content)
return metadata
# 使用
if __name__ == "__main__":
extractor = MetadataExtractor()
content = """
人工智能是当今最热门的技术领域之一。机器学习是AI的核心技术。
深度学习使用神经网络,在图像识别和自然语言处理中应用广泛。
Python是最常用的编程语言,拥有丰富的AI库。
"""
metadata = extractor.extract(content, "ai_intro.txt")
print("📋 元数据:")
for key, value in metadata.items():
print(f" {key}: {value}")
八、完整实战:企业知识库构建
8.1 系统架构
"""
企业知识库构建系统
完整流程:
1. 扫描文档
2. 提取内容
3. 清洗文本
4. 分块处理
5. 提取元数据
6. 向量化存储
7. 质量验证
"""
import os
from typing import List, Dict
class EnterpriseKnowledgeBase:
"""企业知识库构建器"""
def __init__(self, source_dir: str):
self.source_dir = source_dir
self.documents = []
self.chunks = []
# 各组件
self.scanner = DocumentScanner(source_dir)
self.cleaner = TextCleaner()
self.chunker = SemanticChunker(min_chunk_size=100, max_chunk_size=500)
self.metadata_extractor = MetadataExtractor()
def ingest(self) -> int:
"""摄入所有文档
返回:
生成的块数量
"""
print("=" * 60)
print("🚀 开始构建知识库")
print("=" * 60)
# 1. 扫描文档
print("\n📂 步骤1:扫描文档")
docs = self.scanner.scan(recursive=True)
print(f" 找到 {len(docs)} 个文档")
# 2-5. 处理每个文档
print("\n📝 步骤2-5:处理文档")
all_chunks = []
for i, doc_info in enumerate(docs, 1):
try:
# 读取内容(根据格式选择提取器)
content = self._extract_content(doc_info)
if not content:
continue
# 清洗
content = self.cleaner.clean(content)
# 分块
metadata = self.metadata_extractor.extract(content, doc_info['path'])
chunks = self.chunker.chunk(content, metadata)
all_chunks.extend(chunks)
print(f" [{i}/{len(docs)}] {doc_info['name']} → {len(chunks)} 块")
except Exception as e:
print(f" ⚠️ 处理失败 {doc_info['name']}: {e}")
self.chunks = all_chunks
print(f"\n✅ 知识库构建完成:{len(self.chunks)} 个块")
return len(self.chunks)
def _extract_content(self, doc_info: Dict) -> str:
"""根据格式提取内容"""
path = doc_info['path']
format_type = doc_info['format']
if format_type == 'text':
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
elif format_type == 'pdf':
extractor = PDFExtractor(path)
return extractor.extract_text()
elif format_type == 'markdown':
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
# 其他格式暂不支持
return ""
def search(self, query: str, top_k: int = 5) -> List[Dict]:
"""检索(简单实现)"""
# 这里应该用向量检索
# 简化实现:关键词匹配
results = []
for chunk in self.chunks:
content = chunk['content'].lower()
if any(word in content for word in query.lower().split()):
results.append(chunk)
if len(results) >= top_k:
break
return results
def get_stats(self) -> Dict:
"""获取统计信息"""
return {
'total_chunks': len(self.chunks),
'total_chars': sum(len(c['content']) for c in self.chunks),
'avg_chunk_size': sum(len(c['content']) for c in self.chunks) / len(self.chunks) if self.chunks else 0,
}
# 使用
if __name__ == "__main__":
# 创建测试目录和文档
os.makedirs("test_kb", exist_ok=True)
with open("test_kb/AI简介.txt", "w") as f:
f.write("人工智能是热门技术。机器学习是AI核心。深度学习使用神经网络。")
with open("test_kb/Python教程.txt", "w") as f:
f.write("Python是编程语言。Python易学易用。Python广泛用于AI领域。")
# 构建知识库
kb = EnterpriseKnowledgeBase("test_kb")
kb.ingest()
# 统计
stats = kb.get_stats()
print(f"\n📊 统计:{stats}")
# 检索
results = kb.search("Python")
print(f"\n🔍 检索'Python'结果:")
for r in results:
print(f" - {r['content'][:50]}...")
九、质量控制
9.1 质量检查清单
class QualityChecker:
"""质量检查器
检查知识库的各类质量问题
"""
def __init__(self):
self.issues = []
def check_chunk_size(self, chunks: List[Dict], min_size: int = 50, max_size: int = 1000):
"""检查块大小"""
too_small = [c for c in chunks if len(c['content']) < min_size]
too_large = [c for c in chunks if len(c['content']) > max_size]
if too_small:
print(f"⚠️ {len(too_small)} 个块太小(<{min_size}字)")
if too_large:
print(f"⚠️ {len(too_large)} 个块太大(>{max_size}字)")
return {
'too_small': len(too_small),
'too_large': len(too_large)
}
def check_duplicates(self, chunks: List[Dict]):
"""检查重复内容"""
hashes = {}
duplicates = []
for chunk in chunks:
content_hash = hashlib.md5(chunk['content'].encode()).hexdigest()
if content_hash in hashes:
duplicates.append({
'chunk_id': chunk['chunk_id'],
'duplicate_of': hashes[content_hash]
})
else:
hashes[content_hash] = chunk['chunk_id']
if duplicates:
print(f"⚠️ 发现 {len(duplicates)} 对重复块")
return duplicates
def check_coverage(self, chunks: List[Dict], sample_queries: List[str]) -> Dict:
"""检查覆盖率
用测试查询检查检索效果
"""
coverage = {}
for query in sample_queries:
query_words = set(query.lower().split())
matched = 0
for chunk in chunks:
content_words = set(chunk['content'].lower().split())
if query_words & content_words: # 有交集
matched += 1
coverage[query] = matched / len(chunks) if chunks else 0
return coverage
def run_all_checks(self, chunks: List[Dict]):
"""运行所有检查"""
print("\n🔍 质量检查:")
size_issues = self.check_chunk_size(chunks)
dupes = self.check_duplicates(chunks)
# 覆盖率检查(示例查询)
sample_queries = ["人工智能", "机器学习", "Python"]
coverage = self.check_coverage(chunks, sample_queries)
print("\n📊 覆盖率:")
for query, rate in coverage.items():
print(f" '{query}': {rate*100:.1f}%")
return {
'size_issues': size_issues,
'duplicates': len(dupes),
'coverage': coverage
}
十、总结与练习
10.1 要点回顾
- 文档采集:多格式支持,批量处理
- 文档清洗:去噪、规范化
- 分块策略:固定大小 vs 语义分块
- 元数据:来源、时间、关键词
- 质量控制:大小、重复、覆盖率
10.2 分块策略选择
| 场景 | 推荐策略 | 块大小 |
|---|---|---|
| FAQ | 按问题分割 | 整个问题 |
| 长文档 | 语义分块 | 300-500字 |
| 代码文档 | 固定+函数边界 | 200-400字 |
10.3 延伸阅读
- LangChain Document Loaders:官方文档
- Unstructured库:支持100+种文档格式
10.4 课后练习
基础题:实现一个PDF文档的知识库构建流程。
进阶题:设计一个支持增量更新的知识库系统。
挑战题:实现自动分块策略选择——根据文档类型自动选择最优分块方式。
扫码关注公众号
扫码添加QQ
【Prompt炼金术】Day8|模板库:拿来即用的实战模板集合
【Prompt炼金术】Day8|模板库:拿来即用的实战模板集合
【Prompt炼金术】Day7|思维链:让AI从”胡言乱语”到”有理有据”
【Prompt炼金术】Day6|高级参数:让AI输出稳定可控的秘诀