【进阶实战】Day8:批量自动化生产——用AI同时处理100+任务的秘诀
你还在一个个手动处理重复性工作吗?
写100封邮件?生成100张图片?处理100条数据?
如果你还在这样做,强烈建议你停下来——今天学的批量自动化生产,能让你一个人顶十个人。
本文将教你如何用AI构建批量自动化工作流,实现”一键搞定100件事”的效率飞跃。
🔹 一、为什么需要批量自动化
批量自动化:从18小时到20分钟的效率飞跃
▸ 1.1 手动处理的痛苦
某公司运营小王每天要:
- 回复50个客户的咨询邮件
- 生成20张产品宣传图
- 更新30个社交媒体文案
- 整理100条销售数据
如果纯手动完成:
- 邮件回复:50封 × 5分钟 = 4小时17分钟
- 产品图片:20张 × 15分钟 = 5小时
- 社交文案:30条 × 10分钟 = 5小时
- 数据整理:100条 × 2分钟 = 3小时20分钟
总计:约18小时——明显超过一天的工作时间!
▸ 1.2 批量自动化的威力
同样的工作,用AI批量处理:
- 邮件回复:AI自动生成,5分钟完成
- 产品图片:批量生成,10分钟完成
- 社交文案:模板批量填充,5分钟完成
- 数据整理:自动化脚本,1分钟完成
总计:约20分钟
效率提升:54倍
这就是批量自动化的价值——把重复性工作交给AI,你只需要做最后的审核和调整。
🔹 二、批量自动化的核心原理
实战一:批量生成个性化邮件
▸ 2.1 什么是批量自动化
定义:批量自动化是指使用AI和脚本技术,对大量相似任务进行自动化处理的工作方式。
本质:将”重复的事”交给机器,”创造性的事”留给人
▸ 2.2 批量自动化的三种模式
| 模式 | 说明 | 适用场景 | 示例 |
|---|---|---|---|
| 模板填充型 | 预先定义模板,批量替换变量 | 结构化内容生产 | 100封邮件、100份报告 |
| 批量生成型 | 一次输入,批量输出 | 多内容同时生产 | 100张图片、100段文案 |
| 流水线型 | 多个步骤自动串联 | 复杂多步骤任务 | 采集→处理→发布 |
▸ 2.3 批量自动化的流程
🔹 三、实战一:批量生成100封个性化邮件
实战二:批量生成产品主图
▸ 3.1 项目需求
某电商公司要向1000个老客户发送促销活动邮件,每封邮件需要:
- 称呼(客户名字)
- 提及上次购买的产品
- 推荐相关新品
- 包含个性化优惠码
▸ 3.2 数据准备
首先准备客户数据(Excel格式):
| 客户ID | 姓名 | 上次购买 | 购买金额 | 偏好品类 |
|---|---|---|---|---|
| C001 | 张三 | 运动鞋 | 599 | 运动 |
| C002 | 李四 | 面霜 | 299 | 护肤 |
| C003 | 王五 | 蓝牙耳机 | 899 | 数码 |
| … | … | … | … | … |
▸ 3.3 完整代码实现
import openpyxl
import time
from openai import OpenAI
import json
client = OpenAI(api_key="your-api-key")
def read_customer_data(filepath):
"""读取客户数据"""
wb = openpyxl.load_workbook(filepath)
ws = wb.active
customers = []
headers = [cell.value for cell in ws[1]]
for row in ws.iter_rows(min_row=2, values_only=True):
if row[0]: # 跳过空行
customer = dict(zip(headers, row))
customers.append(customer)
return customers
def generate_email_template():
"""定义邮件模板"""
return """你是专业的电商营销文案专家。请根据以下客户信息,生成一封个性化的促销邮件。
客户信息:
- 姓名:{name}
- 上次购买:{last_purchase}
- 购买金额:{amount}元
- 偏好品类:{category}
要求:
1. 邮件长度150-200字
2. 语气亲切友好,像朋友推荐
3. 提及客户上次购买,增加亲切感
4. 推荐与偏好相关的1-2个新品
5. 包含专属优惠码:PREMIUM{code}
6. 不要使用"尊敬的"等正式称呼,用{name}直接称呼
请直接输出邮件正文,不要其他说明。
"""
def generate_personalized_email(customer):
"""为单个客户生成邮件"""
prompt = generate_email_template().format(
name=customer['姓名'],
last_purchase=customer['上次购买'],
amount=customer['购买金额'],
category=customer['偏好品类'],
code=str(customer['客户ID'])[1:] # 去掉C前缀
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.7 # 稍微有创意但不跑偏
)
return response.choices[0].message.content
def batch_generate_emails(customer_file, output_file, batch_size=10, delay=1):
"""
批量生成邮件
Args:
customer_file: 客户数据文件
output_file: 输出文件
batch_size: 每批处理数量(避免API限流)
delay: 请求间隔(秒)
"""
customers = read_customer_data(customer_file)
total = len(customers)
print(f"开始批量生成邮件,共{total}封...\n")
results = []
for i, customer in enumerate(customers, 1):
try:
email = generate_personalized_email(customer)
results.append({
'id': customer['客户ID'],
'name': customer['姓名'],
'email_content': email
})
print(f"[{i}/{total}] ✅ {customer['姓名']} - 邮件生成成功")
except Exception as e:
print(f"[{i}/{total}] ❌ {customer['姓名']} - 生成失败: {e}")
results.append({
'id': customer['客户ID'],
'name': customer['姓名'],
'email_content': None,
'error': str(e)
})
# 分批处理,避免API限流
if i % batch_size == 0:
print(f"\n--- 已完成{i}封,休息{delay}秒 ---\n")
time.sleep(delay)
# 保存结果
with open(output_file, 'w', encoding='utf-8') as f:
for r in results:
f.write(f"========== {r['name']} ==========\n")
if r['email_content']:
f.write(r['email_content'] + '\n')
else:
f.write(f"[生成失败] {r.get('error', 'Unknown')}\n")
f.write('\n')
success = sum(1 for r in results if r['email_content'])
print(f"\n========== 完成! ==========")
print(f"成功:{success}/{total}")
print(f"结果已保存到:{output_file}")
return results
if __name__ == "__main__":
results = batch_generate_emails(
customer_file='customers.xlsx',
output_file='generated_emails.txt',
batch_size=10,
delay=2
)
▸ 3.4 运行效果
开始批量生成邮件,共100封...
[1/100] ✅ 张三 - 邮件生成成功
[2/100] ✅ 李四 - 邮件生成成功
[3/100] ✅ 王五 - 邮件生成成功
...
[10/100] ✅ 赵六 - 邮件生成成功
--- 已完成10封,休息2秒 ---
[11/100] ✅ 周七 - 邮件生成成功
...
========== 完成! ==========
成功:98/100
结果已保存到:generated_emails.txt
▸ 3.5 生成邮件示例
========== 张三 ==========
张三,
上次你买的那双运动鞋穿着怎么样?我记得你花了599元买的那双,跑起步来应该很舒服吧!
这次我们新到了一款专业跑步T恤,采用速干面料,非常适合夏天运动。还有一款智能手环,能实时监测心率和跑步距离,跟你的运动鞋是绝配!
给你准备了专属优惠码 PREMIUM001,满200减30,可以和其他优惠叠加使用。
点击这里查看新品:[链接]
期待下次为你服务!
小美
🔹 四、实战二:批量生成100张产品图
▸ 4.1 项目需求
某电商店铺有100个产品需要制作主图,每个产品有:
- 产品名
- 核心卖点(1-2个关键词)
- 背景颜色偏好
▸ 4.2 完整代码实现
import requests
import json
import time
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchImageGenerator:
"""批量图片生成器"""
def __init__(self, haiYi_cookie_file, output_dir):
self.output_dir = output_dir
self.cookies = self._load_cookies(haiYi_cookie_file)
self.create_url = 'https://www.haiyi.art/api/v1/task/create'
self.query_url = 'https://www.haiyi.art/api/v1/task/detail'
os.makedirs(output_dir, exist_ok=True)
def _load_cookies(self, filepath):
with open(filepath) as f:
cookie_data = json.load(f)
return {c['name']: c['value'] for c in cookie_data['cookies']}
def generate_image(self, product_name, keyword, color_theme, task_id):
"""生成单张产品图"""
prompt = f"""Product main image for e-commerce,
Product: {product_name},
Key feature: {keyword},
Background: {color_theme} gradient,
Style: Clean, professional, white background dominant,
High quality, 4K, realistic product photography"""
payload = {
'action': 1,
'art_model_no': '26058e019e3a0c026e1ad2bfa69e2333',
'model_no': '26058e019e3a0c026e1ad2bfa69e2333',
'model_ver_no': 'f8bd92160ac1555b92de3488a0c4082f30fee233',
'meta': {
'width': 1024,
'height': 1024,
'prompt': prompt,
'n_iter': 1
}
}
headers = {
'User-Agent': 'Mozilla/5.0',
'Content-Type': 'application/json',
'token': '0DA5A8FED414FDA9',
'X-App-Id': 'web_global_seaart',
'X-Device-Id': self.cookies.get('deviceId', '')
}
# 创建任务
r = requests.post(self.create_url, headers=headers,
cookies=self.cookies, json=payload, timeout=60)
tid = r.json().get('data', {}).get('id')
# 等待生成
time.sleep(20)
# 查询结果
r = requests.post(self.query_url, headers=headers,
cookies=self.cookies, json={'id': tid}, timeout=30)
result = r.json()
img_uris = result.get('data', {}).get('img_uris')
if img_uris:
return {
'success': True,
'task_id': task_id,
'product': product_name,
'image_url': img_uris[0]['url']
}
else:
return {
'success': False,
'task_id': task_id,
'product': product_name,
'error': '生成失败'
}
def download_image(self, url, filepath):
"""下载图片到本地"""
r = requests.get(url, timeout=60)
if r.status_code == 200:
with open(filepath, 'wb') as f:
f.write(r.content)
return True
return False
def batch_generate(self, products, max_workers=3):
"""
批量生成图片
Args:
products: 产品列表 [{'name': '产品1', 'keyword': '卖点1', 'color': '蓝色'}, ...]
max_workers: 最大并发数
"""
total = len(products)
print(f"开始批量生成图片,共{total}张...\n")
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_product = {
executor.submit(
self.generate_image,
p['name'],
p['keyword'],
p.get('color', 'blue'),
i
): p for i, p in enumerate(products)
}
for future in as_completed(future_to_product):
result = future.result()
results.append(result)
if result['success']:
print(f"[{result['task_id']+1}/{total}] ✅ {result['product']}")
# 下载图片
filepath = os.path.join(
self.output_dir,
f"{result['task_id']+1}_{result['product']}.webp"
)
if self.download_image(result['image_url'], filepath):
print(f" 📥 已保存: {filepath}")
else:
print(f"[{result['task_id']+1}/{total}] ❌ {result['product']} - {result['error']}")
# 统计结果
success = sum(1 for r in results if r['success'])
print(f"\n========== 完成! ==========")
print(f"成功:{success}/{total}")
return results
if __name__ == "__main__":
# 准备产品数据
products = [
{'name': '运动蓝牙耳机', 'keyword': '防水防汗', 'color': 'black'},
{'name': '智能手表', 'keyword': '心率监测', 'color': 'silver'},
{'name': '便携充电宝', 'keyword': '20000mAh', 'color': 'white'},
{'name': '无线蓝牙音箱', 'keyword': '360°环绕立体声', 'color': 'red'},
# ... 继续添加更多产品
]
generator = BatchImageGenerator(
haiYi_cookie_file='config/haiyi-cookie.json',
output_dir='output/product_images'
)
results = generator.batch_generate(products, max_workers=3)
▸ 4.3 批量处理技巧
1. 并发控制
max_workers = 3 # 同时最多3个任务
from threading import Semaphore
semaphore = Semaphore(3)
2. 错误重试
def generate_with_retry(product, max_retries=3):
for attempt in range(max_retries):
try:
result = generate_image(product)
if result['success']:
return result
except Exception as e:
if attempt < max_retries - 1:
time.sleep(5) # 等待后重试
else:
return {'success': False, 'error': str(e)}
3. 进度保存
def save_checkpoint(results, checkpoint_file):
"""定期保存进度,防止意外中断丢失数据"""
with open(checkpoint_file, 'w') as f:
json.dump(results, f)
🔹 五、实战三:批量处理Excel数据
▸ 5.1 项目需求
某公司有1000条潜在客户数据,需要:
- 补充客户公司信息(通过公司名查官网)
- 判断客户行业类别
- 评估客户价值等级
- 生成跟进建议
▸ 5.2 完整代码实现
import openpyxl
import time
from openpyxl.styles import Font, PatternFill, Alignment
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
class CustomerDataProcessor:
"""客户数据批量处理器"""
def __init__(self):
self.prompt_template = """分析以下客户信息,完成数据补充:
公司名称:{company_name}
联系人:{contact}
职位:{position}
联系方式:{contact_info}
请以JSON格式返回以下信息:
{
"industry": "行业分类",
"company_size": "公司规模(大型/中型/小型)",
"value_level": "客户价值等级(A/B/C)",
"followup_suggestion": "跟进建议(50字内)"
}
只返回JSON,不要其他内容。
"""
def process_single_customer(self, customer_data):
"""处理单个客户数据"""
prompt = self.prompt_template.format(**customer_data)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.3 # 低温度,结果更稳定
)
result_text = response.choices[0].message.content
# 解析JSON结果
try:
# 尝试提取JSON
if '{' in result_text:
json_str = result_text[result_text.find('{'):result_text.rfind('}')+1]
result = json.loads(json_str)
return {
'success': True,
'industry': result.get('industry', ''),
'company_size': result.get('company_size', ''),
'value_level': result.get('value_level', ''),
'followup_suggestion': result.get('followup_suggestion', '')
}
except:
return {
'success': False,
'industry': '解析失败',
'company_size': '',
'value_level': '',
'followup_suggestion': ''
}
def process_batch(self, input_file, output_file, start_row=2, batch_size=20, delay=1):
"""
批量处理客户数据
Args:
input_file: 输入Excel文件
output_file: 输出Excel文件
start_row: 从第几行开始处理(1是表头)
batch_size: 每批处理数量
delay: 请求间隔(秒)
"""
# 读取输入文件
wb_in = openpyxl.load_workbook(input_file)
ws_in = wb_in.active
# 创建输出文件
wb_out = openpyxl.Workbook()
ws_out = wb_out.active
# 复制表头
headers = [cell.value for cell in ws_in[1]]
new_headers = headers + ['行业分类', '公司规模', '客户价值', '跟进建议']
ws_out.append(new_headers)
# 处理数据
total_rows = ws_in.max_row - start_row + 1
current_row = start_row
processed = 0
print(f"开始批量处理,共{total_rows}条数据...\n")
for i, row in enumerate(ws_in.iter_rows(min_row=start_row, values_only=True), 1):
customer_data = dict(zip(headers, row))
try:
result = self.process_single_customer(customer_data)
# 写入结果
new_row = list(row) + [
result['industry'],
result['company_size'],
result['value_level'],
result['followup_suggestion']
]
ws_out.append(new_row)
print(f"[{i}/{total_rows}] ✅ {customer_data.get('公司名称', 'Unknown')}")
if result['success']:
print(f" 行业: {result['industry']} | 价值: {result['value_level']}")
processed += 1
except Exception as e:
print(f"[{i}/{total_rows}] ❌ {customer_data.get('公司名称', 'Unknown')}")
print(f" 错误: {e}")
# 写入空结果
ws_out.append(list(row) + ['', '', '', ''])
# 分批保存,避免内存占用
if i % batch_size == 0:
wb_out.save(output_file)
print(f"\n--- 已处理{i}条,保存进度 ---\n")
time.sleep(delay)
# 最终保存
wb_out.save(output_file)
print(f"\n========== 处理完成! ==========")
print(f"成功处理:{processed}/{total_rows}")
print(f"结果已保存到:{output_file}")
return processed
def format_output(self, output_file):
"""格式化输出文件"""
wb = openpyxl.load_workbook(output_file)
ws = wb.active
# 设置列宽
ws.column_dimensions['A'].width = 15 # 公司名称
ws.column_dimensions['B'].width = 10 # 联系人
ws.column_dimensions['E'].width = 15 # 行业分类
ws.column_dimensions['F'].width = 12 # 公司规模
ws.column_dimensions['G'].width = 12 # 客户价值
ws.column_dimensions['H'].width = 40 # 跟进建议
# 高价值客户标绿色
green_fill = PatternFill(start_color="90EE90", end_color="90EE90", fill_type="solid")
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
if row[6].value == 'A': # G列是客户价值
for cell in row:
cell.fill = green_fill
wb.save(output_file)
if __name__ == "__main__":
processor = CustomerDataProcessor()
processor.process_batch(
input_file='potential_customers.xlsx',
output_file='processed_customers.xlsx',
batch_size=20,
delay=1
)
# 格式化输出
processor.format_output('processed_customers.xlsx')
▸ 5.3 运行效果
开始批量处理,共1000条数据...
[1/1000] ✅ 深圳市腾讯科技有限公司
行业: 互联网/软件 | 价值: A
[2/1000] ✅ 杭州阿里巴巴网络技术有限公司
行业: 电子商务 | 价值: A
[3/1000] ✅ 北京小米科技有限公司
行业: 智能硬件 | 价值: A
...
[20/1000] ✅ 上海字节跳动科技有限公司
行业: 互联网/内容 | 价值: A
--- 已处理20条,保存进度 ---
...
========== 处理完成! ==========
成功处理:985/1000
结果已保存到:processed_customers.xlsx
🔹 六、实战四:批量内容发布到多平台
▸ 6.1 项目需求
一篇公众号文章,要同步发布到:
- 知乎专栏
- 微信公众号
- 今日头条
- 百家号
▸ 6.2 完整代码实现
import requests
import time
import hashlib
from datetime import datetime
class MultiPlatformPublisher:
"""多平台内容发布器"""
def __init__(self):
self.platforms = {
'zhihu': {
'api': 'https://api.zhihu.com/articles',
'headers': {'Authorization': 'Bearer YOUR_ZHIHU_TOKEN'}
},
'toutiao': {
'api': 'https://mp.toutiao.com/open_api/article/publish',
'headers': {'Authorization': 'YOUR_TOUTIAO_TOKEN'}
},
'baijiahao': {
'api': 'https://baijiahao.baidu.com/builder/article/publish',
'headers': {'Authorization': 'YOUR_BAIJIA_TOKEN'}
}
}
def adapt_content_for_platform(self, original_content, platform):
"""根据平台调整内容格式"""
if platform == 'zhihu':
# 知乎支持Markdown,添加目录
return f"""## 目录
- [前言](#前言)
- [主要内容](#主要内容)
- [总结](#总结)
{original_content}
*本文首发于公众号,禁止未经授权转载。
更多精彩内容,欢迎关注我的专栏。*
"""
elif platform == 'toutiao':
# 今日头条需要单独的摘要
return {
'title': self.extract_title(original_content),
'content': original_content,
'abstract': self.generate_abstract(original_content, 100),
'tags': ['AI', '科技', '教程']
}
elif platform == 'baijiahao':
# 百家号格式要求
return {
'title': self.extract_title(original_content),
'content': original_content,
'original_source': '公众号名称'
}
return original_content
def extract_title(self, content):
"""提取标题"""
lines = content.split('\n')
for line in lines:
line = line.strip()
if line.startswith('# '):
return line[2:].strip()
return "未分类文章"
def generate_abstract(self, content, max_length=100):
"""生成摘要"""
# 去掉标题和标记
text = content.replace('#', '').replace('*', '').replace('\n', ' ')
# 取前100字
abstract = text[:max_length] + '...' if len(text) > max_length else text
return abstract
def publish_to_platform(self, platform, content):
"""发布到单个平台"""
if platform not in self.platforms:
return {'success': False, 'error': '未知平台'}
config = self.platforms[platform]
adapted = self.adapt_content_for_platform(content, platform)
try:
if isinstance(adapted, dict):
r = requests.post(
config['api'],
headers=config['headers'],
json=adapted,
timeout=30
)
else:
r = requests.post(
config['api'],
headers=config['headers'],
data=adapted,
timeout=30
)
if r.status_code == 200:
result = r.json()
return {
'success': True,
'platform': platform,
'article_id': result.get('data', {}).get('article_id'),
'article_url': result.get('data', {}).get('article_url')
}
else:
return {
'success': False,
'platform': platform,
'error': f'HTTP {r.status_code}'
}
except Exception as e:
return {
'success': False,
'platform': platform,
'error': str(e)
}
def publish_all(self, content, platforms=None):
"""
批量发布到所有平台
Args:
content: 文章内容
platforms: 指定平台列表,None表示发布到所有
"""
if platforms is None:
platforms = list(self.platforms.keys())
print(f"开始批量发布,共{len(platforms)}个平台...\n")
results = []
for platform in platforms:
print(f"📤 发布到 {platform}...")
result = self.publish_to_platform(platform, content)
results.append(result)
if result['success']:
print(f" ✅ 成功! URL: {result.get('article_url', 'N/A')}")
else:
print(f" ❌ 失败: {result.get('error', 'Unknown')}")
# 间隔发布,避免被风控
time.sleep(3)
# 统计结果
success = sum(1 for r in results if r['success'])
print(f"\n========== 发布完成! ==========")
print(f"成功:{success}/{len(platforms)}")
for r in results:
if r['success']:
print(f" ✅ {r['platform']}: {r.get('article_url', '')}")
return results
if __name__ == "__main__":
# 读取文章内容
with open('article.md', 'r', encoding='utf-8') as f:
article_content = f.read()
publisher = MultiPlatformPublisher()
# 发布到所有平台
results = publisher.publish_all(
content=article_content,
platforms=['zhihu', 'toutiao', 'baijiahao']
)
🔹 七、批量自动化的最佳实践
▸ 7.1 稳定性保障
1. 任务检查点
def process_with_checkpoint(tasks, process_func, checkpoint_file):
"""带检查点的批量处理"""
# 读取已完成的进度
completed = set()
if os.path.exists(checkpoint_file):
with open(checkpoint_file) as f:
completed = set(json.load(f))
results = []
for i, task in enumerate(tasks):
if str(task['id']) in completed:
continue # 跳过已完成的
result = process_func(task)
results.append(result)
# 更新检查点
completed.add(str(task['id']))
with open(checkpoint_file, 'w') as f:
json.dump(list(completed), f)
return results
2. 错误隔离
def safe_process(task):
"""安全处理单个任务,错误不影响其他任务"""
try:
return process_single(task)
except Exception as e:
return {
'success': False,
'task_id': task['id'],
'error': str(e),
'error_type': type(e).__name__
}
▸ 7.2 效率优化
1. 批量API调用
def batch_api_call(items, batch_size=50):
"""批量API调用,减少网络开销"""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
batch_results = api.bulk_process(batch)
results.extend(batch_results)
time.sleep(1) # 避免限流
return results
2. 异步处理
import asyncio
async def async_batch_process(tasks, max_concurrent=5):
"""异步批量处理"""
semaphore = asyncio.Semaphore(max_concurrent)
async def safe_task(task):
async with semaphore:
return await process_async(task)
return await asyncio.gather(*[safe_task(t) for t in tasks])
▸ 7.3 质量控制
1. 抽样审核
def quality_check(results, sample_size=10):
"""抽样检查生成质量"""
import random
samples = random.sample(results, min(sample_size, len(results)))
print("========== 质量抽检 ==========")
for sample in samples:
quality = input(f"\n样本 {sample['id']} 质量如何?(1-5): ")
sample['quality_score'] = int(quality)
avg_quality = sum(s['quality_score'] for s in samples) / len(samples)
print(f"\n平均质量分数:{avg_quality:.2f}")
if avg_quality < 3:
print("⚠️ 质量偏低,建议调整Prompt!")
🔹 八、常见问题与解决方案
▸ 问题一:API限流怎么办
原因:请求频率太高,被API服务商限制
解决:
- 降低请求频率(增加delay)
- 使用批量API(如果有)
- 申请更高的API配额
def retry_with_backoff(func, max_retries=5):
for i in range(max_retries):
try:
return func()
except RateLimitError:
wait_time = 2 ** i
time.sleep(wait_time)
raise Exception("Max retries exceeded")
▸ 问题二:部分任务失败怎么办
解决:
- 实现错误重试机制
- 保存失败任务列表,人工处理
- 设置失败率阈值,超过则停止
failure_rate = len(failed_tasks) / total_tasks
if failure_rate > 0.1: # 超过10%失败率
print("⚠️ 失败率过高,停止任务,检查问题!")
break
▸ 问题三:内存占用过高
解决:
- 分批处理,不要一次加载所有数据
- 及时保存结果,释放内存
- 使用生成器替代列表
def generate_tasks():
for row in large_file:
yield row # 逐行返回,不一次性加载
for task in generate_tasks():
process(task)
time.sleep(0.1)
🔹 九、总结与下期预告
▸ 本章知识点回顾
- 为什么需要批量自动化:效率提升54倍
- 三种批量模式:模板填充、批量生成、流水线
- 实战技能:
– 批量生成100封个性化邮件
– 批量生成100张产品图
– 批量处理1000条客户数据
– 一键发布到多平台
- 最佳实践:检查点机制、错误隔离、抽样审核
▸ 批量自动化的能力边界
| 擅长 | 不擅长 |
|---|---|
| 重复性、结构化任务 | 需要创意的工作 |
| 大批量数据处理 | 需要实时判断的任务 |
| 多平台同步发布 | 需要深度思考的内容 |
▸ 下期预告
Day9我们将学习:Python集成——如何用Python调用AI API,构建完整的AI应用。
参考资料:
- OpenAI API文档
- 海艺AI API文档
- Python批量处理实战
关于作者:本文为AI进阶实战30天系列第8篇。
互动话题:你在工作中有什么重复性任务想用AI自动化?欢迎评论区分享!
关注我,每天学习一个AI硬技能!
扫码关注公众号
扫码添加QQ
【Prompt炼金术】Day8|模板库:拿来即用的实战模板集合
【Prompt炼金术】Day8|模板库:拿来即用的实战模板集合
【Prompt炼金术】Day7|思维链:让AI从”胡言乱语”到”有理有据”
【Prompt炼金术】Day6|高级参数:让AI输出稳定可控的秘诀