
📦 淘宝订单信息采集器(Playwright 版 Python 脚本)
大纲
这份文档将包括:
- 项目背景与需求
- 核心思路与技术要点
- 完整的、带详细注释的代码
- 使用说明
适用于:已登录淘宝卖家后台,需批量采集“买家已付款”状态订单的真实收货人信息。
1. 🎯 项目背景与需求
📌 核心需求
从淘宝卖家后台的“已卖出的宝贝”页面,自动化采集指定状态订单的 真实买家收货信息(收货人、手机号、收货地址)。 🎯 为什么需要自动化?
- 手动点击每个订单查看收货信息效率极低。
- 淘宝页面结构复杂,数据动态加载,需处理
iframe和异步 API。 - 需要结合订单基础信息(订单号、商品、金额等)与收货信息,导出为结构化数据(如 CSV)。
✅ 最终目标
- ✅ 自动登录(依赖外部 Cookie)
- ✅ 爬取订单列表页的基础信息
- ✅ 精准过滤:仅处理“买家已付款”的订单
- ✅ 进入物流页,通过 iframe 获取
orderid - ✅ 调用内部 API 获取真实收货信息(避免被页面显示的“加密地址”欺骗)
- ✅ 失败备用方案:DOM 解析
- ✅ 导出完整数据到 CSV
2. 🧠 核心思路与技术要点
| 技术点 | 说明 |
|---|---|
| Playwright | 无头浏览器,模拟真实用户操作,处理 JS 渲染和登录态。 |
| Cookie 登录 | 避免手动扫码,通过加载已保存的 Cookie 保持登录。 |
| iframe 处理 | 物流页是主页面嵌套 iframe,必须通过 content_frame() 进入上下文。 |
orderid 提取 | 真实物流 ID 隐藏在 input[scene="consign"][orderid] 元素中,需从 iframe 内提取。 |
| API 调用 | 使用 page.request 直接调用 getRealReceiver 接口,获取未加密的真实地址。 |
| DOM 备用解析 | 当 API 失败时,从 iframe 的文本中正则提取手机号、姓名。 |
| 状态过滤 | 只处理“买家已付款”的订单,跳过“已发货”、“交易关闭”等状态。 |
3. 📄 完整代码(带详细注释)
import asyncio
import re
import csv
from datetime import datetime
from pathlib import Path
import logging
from typing import Dict, List
import random
import time
from playwright.async_api import async_playwright, Page, Frame, Browser, Response
# ============================
# 🛠 配置区
# ============================
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('scraper.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 脚本配置
CONFIG = {
"base_url": "https://seller.taobao.com", # 淘宝卖家后台
"cookie_file": "taobao_cookies.json", # 保存的 Cookie 文件
"output_dir": "output", # 输出目录
"max_pages": 1, # 最大采集页数(用于测试)
"test_mode": True, # 测试模式:只采1页
"fetch_shipping": True, # 是否采集收货信息
"status_filter": ["买家已付款"] # 只采集这些状态的订单
}
# ============================
# 🧩 工具函数
# ============================
async def random_delay(min_sec: float, max_sec: float, action: str = ""):
"""随机延迟,模拟人类操作"""
delay = random.uniform(min_sec, max_sec)
if action:
logger.info(f"⏳ [{action}] 随机延迟 {delay:.2f} 秒...")
else:
logger.debug(f"⏳ 随机延迟 {delay:.2f} 秒...")
await asyncio.sleep(delay)
def setup_output_dir() -> Path:
"""创建输出目录"""
output_path = Path(CONFIG["output_dir"])
output_path.mkdir(exist_ok=True)
return output_path
def load_cookies(page: Page) -> bool:
"""加载本地 Cookie 以保持登录状态"""
cookie_file = Path(CONFIG["cookie_file"])
if not cookie_file.exists():
logger.error(f"❌ Cookie 文件不存在: {cookie_file}")
return False
try:
with open(cookie_file, 'r', encoding='utf-8') as f:
cookies = f.read()
import json
page.context.add_cookies(json.loads(cookies))
logger.info("✅ Cookie 加载成功,尝试保持登录状态...")
return True
except Exception as e:
logger.error(f"❌ 加载 Cookie 失败: {e}")
return False
# ============================
# 🕷 核心采集逻辑
# ============================
class TaobaoOrderScraper:
def __init__(self, page: Page):
self.page = page
self.orders: List[Dict] = []
async def scrape_orders(self):
"""主采集流程"""
logger.info("🚀 开始淘宝订单采集...")
# 1. 前往订单页并加载 Cookie
await self.page.goto(f"{CONFIG['base_url']}/order/sold.htm", wait_until='networkidle')
if not await load_cookies(self.page):
logger.critical("请先登录并保存 Cookie!")
return
await self.page.reload(wait_until='networkidle')
await random_delay(3, 5, "页面加载后")
# 2. 开始翻页采集
for page_num in range(1, CONFIG["max_pages"] + 1):
logger.info(f"📊 正在采集第 {page_num} 页...")
# 提取当前页所有订单
await self._extract_orders_from_page()
# 测试模式只采1页
if CONFIG["test_mode"]:
logger.info("🧪 达到最大测试页数 1,停止采集。")
break
# 尝试翻页
if not await self._go_to_next_page():
logger.info("🔚 已到最后一页。")
break
await random_delay(3, 5, "翻页后")
logger.info(f"📊 基础信息采集完成,共 {len(self.orders)} 条订单。")
async def _extract_orders_from_page(self):
"""从当前页面提取所有订单基础信息"""
# 等待订单列表加载
await self.page.wait_for_selector('label[itemprop="order"]', timeout=30000)
# 获取所有订单块
order_blocks = await self.page.locator('label[itemprop="order"]').all()
logger.info(f"🔍 在当前页找到 {len(order_blocks)} 条订单。")
for idx, block in enumerate(order_blocks, 1):
try:
order_data = await self._extract_single_order(block)
if order_data:
self.orders.append(order_data)
logger.debug(f"📌 添加有效订单 #{len(self.orders)}: {order_data['订单号']}")
except Exception as e:
logger.error(f"❌ 解析第 {idx} 个订单失败: {e}")
async def _extract_single_order(self, block: Page) -> Dict:
"""提取单个订单的基础信息"""
try:
# 提取订单号和创建时间
id_time_text = await block.locator('label[itemprop="order"] .item-mod__checkbox-label___cRGUj').inner_text()
order_id_match = re.search(r'订单号:\s*(\d+)', id_time_text)
create_time_match = re.search(r'创建时间:\s*([\d\-:\s]+)', id_time_text)
if not order_id_match:
return None
order_id = order_id_match.group(1)
create_time = create_time_match.group(1) if create_time_match else "未知"
# 提取商品名
product_name = await block.locator('.production-mod__production___3ZePJ a[target="_blank"] span[style*="line-height"]').first.inner_text()
product_name = product_name.strip()
# 提取数量
quantity = await block.locator('.suborder-mod__production___1eyM1 + td div p').first.inner_text()
quantity = quantity.strip()
# 提取交易状态
status = await block.locator('.sol-mod__no-br___toLPG + td p span').first.inner_text()
status = status.strip()
# 提取实收款
amount = await block.locator('.price-mod__price___3Un7c strong').first.inner_text()
amount = f"¥{amount}"
# 构建订单数据
order_data = {
"订单号": order_id,
"创建时间": create_time,
"宝贝": product_name,
"数量": quantity,
"交易状态": status,
"实收款": amount
}
return order_data
except Exception as e:
logger.error(f"❌ 提取订单信息失败: {e}")
return None
async def _go_to_next_page(self) -> bool:
"""翻页"""
next_btn = self.page.locator('a[title="下一页"]')
if await next_btn.is_enabled():
await next_btn.click()
await self.page.wait_for_load_state('networkidle')
return True
return False
# ============================
# 📦 收货信息采集模块
# ============================
async def extract_shipping_info(page: Page, trade_id: str) -> Dict:
"""
为单个订单采集收货信息
:param page: Playwright 页面对象
:param trade_id: 订单号(tradeId)
:return: 包含收货人、手机号、收货地址的字典
"""
logger.debug(f"🚚 开始为订单 {trade_id} 提取收货信息...")
shipping_data = {"收货人": "未知", "手机号": "未知", "收货地址": "未知"}
tracking_url = f"https://wuliu2.taobao.com/user/n/consign2.htm?tradeId={trade_id}"
try:
await page.goto(tracking_url, wait_until='networkidle', timeout=30000)
await random_delay(2, 4, "物流页加载")
# 检查是否登录
if "login.taobao.com" in page.url:
logger.error("🚨 检测到跳转到登录页,请检查 Cookie 是否有效")
shipping_data.update({"收货人": "登录失效", "手机号": "登录失效", "收货地址": "登录失效"})
return shipping_data
# 【调试】打印所有 iframe
iframes = page.locator('iframe')
count = await iframes.count()
logger.debug(f"🔍 检测到 {count} 个 iframe:")
for i in range(count):
src = await iframes.nth(i).get_attribute('src')
logger.debug(f" - iframe[{i}] src: {src}")
# 等待并进入 iframe
logger.info("📦 等待发货 iframe 加载...")
iframe_element = await page.wait_for_selector('iframe[src*="consign2.htm"]', timeout=20000)
iframe = await iframe_element.content_frame()
if not iframe:
logger.error("❌ 无法获取 iframe 的 content_frame()")
return shipping_data
logger.info("✅ 成功进入 iframe 上下文")
# 从 iframe 中提取 orderid (真实物流ID)
input_locator = iframe.locator('input[scene="consign"][orderid]')
await input_locator.wait_for(state='visible', timeout=20000)
logistics_id = await input_locator.get_attribute('orderid')
if not logistics_id:
logger.error("❌ 从 input 元素中提取 orderid 失败")
return shipping_data
logger.info(f"✅ 成功从 iframe 中提取到 logisticsId: {logistics_id}")
# === 调用 API 获取真实信息 ===
api_url = f"https://wuliu2.taobao.com/user/getRealReceiver?orderId={logistics_id}"
last_error = None
for attempt in range(3):
try:
logger.debug(f"📡 正在请求 API (第 {attempt+1} 次): {api_url}")
response: Response = await page.request.get(api_url, timeout=10000)
if response.status == 200:
data = await response.json()
logger.debug(f"📥 API 响应: {data}")
if data.get("success") and data.get("data"):
raw_addr = data["data"].get("address", "")
if raw_addr:
# 解析 "地址, 姓名, 手机号"
parts = [p.strip() for p in raw_addr.split(',') if p.strip()]
if len(parts) >= 3:
phone = parts[-1]
name = parts[-2]
address = ','.join(parts[:-2])
shipping_data.update({
"收货人": name,
"手机号": phone,
"收货地址": address
})
logger.info(f"✅ 【API】成功获取: {name}, {phone}")
return shipping_data
else:
logger.warning(f"⚠️ API 地址格式异常: '{raw_addr}'")
else:
logger.error("❌ API 返回 success=true 但 address 为空")
else:
msg = data.get("msg", "未知错误")
logger.error(f"❌ API 返回失败: {msg}")
last_error = msg
else:
logger.error(f"❌ API HTTP {response.status}")
last_error = f"HTTP {response.status}"
break # 成功响应后跳出
except Exception as e:
last_error = str(e)
logger.debug(f"❌ 第 {attempt+1} 次 API 请求失败: {e}")
if attempt < 2:
await random_delay(5, 8, "API 请求失败后重试前")
else:
logger.error(f"❌ API 请求失败,已重试 3 次,最终错误: {last_error}")
# === 备用方案:从 iframe DOM 解析 ===
try:
logger.info("🔍 启动 DOM 备用解析...")
# 查找包含手机号的文本
phone_locator = iframe.locator('text=/\\d{11}/')
await phone_locator.first.wait_for_element_state('visible', timeout=15000)
full_text = re.sub(r'\s+', ' ', await phone_locator.first.inner_text()).strip()
logger.debug(f"📄 DOM 文本内容: {full_text}")
# 提取手机号
phone_match = re.search(r'(\d{11})', full_text)
if not phone_match:
raise Exception("未找到手机号")
phone = phone_match.group(1)
# 提取姓名(手机号前最后一个非数字部分)
before_phone = full_text[:phone_match.start()].strip()
for kw in ['收货人', '姓名', '电话', '手机', '联系人']:
before_phone = re.sub(rf'{kw}\s*[::]?\s*', '', before_phone)
name_parts = re.split(r'[,,\s]+', before_phone.strip())
name = name_parts[-1] if name_parts else "未知"
# 地址是其余部分
address_end = full_text.rfind(name)
address = full_text[:address_end].strip()
shipping_data.update({
"收货人": name,
"手机号": phone,
"收货地址": address
})
logger.info(f"✅ 【DOM】解析成功: {name}, {phone}")
except Exception as e:
logger.error(f"💥 DOM 备用解析失败: {e}")
return shipping_data
except Exception as e:
logger.error(f"❌ 提取收货信息失败: {e}")
logger.exception(e)
return shipping_data
# ============================
# 💾 数据导出
# ============================
def save_to_csv(orders: List[Dict], output_dir: Path):
"""将订单数据保存为 CSV"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = output_dir / f"orders_sold_{timestamp}.csv"
# 确保字段顺序
fieldnames = ["订单号", "创建时间", "宝贝", "数量", "交易状态", "实收款", "收货人", "手机号", "收货地址"]
with open(filename, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(orders)
logger.info(f"💾 ✅ 数据保存成功! 文件: {filename}")
# ============================
# 🏁 主函数
# ============================
async def main():
async with async_playwright() as p:
browser: Browser = await p.chromium.launch(headless=False) # headless=False 便于调试
context = await browser.new_context(
viewport={'width': 1280, 'height': 800},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
)
page: Page = await context.new_page()
try:
# 1. 初始化采集器
scraper = TaobaoOrderScraper(page)
await scraper.scrape_orders()
# 2. 为符合条件的订单采集收货信息
if CONFIG["fetch_shipping"] and scraper.orders:
logger.info("📦 开始为每条订单采集【买家收货信息】...")
output_dir = setup_output_dir()
for i, order in enumerate(scraper.orders, 1):
status = order.get("交易状态", "")
if status not in CONFIG["status_filter"]:
logger.info(f"({i}/{len(scraper.orders)}) ⚠️ 跳过订单 {order['订单号']},状态为 '{status}' (不在采集列表中)")
continue
logger.info(f"({i}/{len(scraper.orders)}) 为订单 {order['订单号']} 采集收货信息...")
shipping_info = await extract_shipping_info(page, order['订单号'])
order.update(shipping_info) # 合并收货信息
await random_delay(5, 10, "处理完订单后")
# 3. 保存数据
save_to_csv(scraper.orders, output_dir)
except Exception as e:
logger.critical(f"❌ 程序运行出错: {e}")
logger.exception(e)
finally:
await browser.close()
logger.info("🧹 ✅ 浏览器已关闭。")
logger.info("🏁 程序运行结束")
if __name__ == "__main__":
asyncio.run(main())
4. 📖 使用说明
1️⃣ 准备工作
-
安装依赖:
pip install playwright playwright install -
手动登录并保存 Cookie:
- 运行一个临时脚本,打开淘宝卖家后台。
- 手动登录。
- 执行
await page.context.storage_state(path='taobao_cookies.json')保存 Cookie。
2️⃣ 配置 CONFIG
修改脚本顶部的 CONFIG 字典:
max_pages: 设置要采集的页数。status_filter: 修改为你想采集的订单状态。test_mode: 测试时设为True。
3️⃣ 运行脚本
python taobao_scraper.py
4️⃣ 查看结果
- 日志:
scraper.log - 数据:
output/orders_sold_*.csv
5.(图形化界面)及脚本方式运行
- 创建图形化界面 (Tkinter)
- Tkinter 是 Python 的标准 GUI 库,易于上手。
基本结构
# gui_app.py
import tkinter as tk
from tkinter import ttk, messagebox, filedialog, scrolledtext
import json
import threading
import asyncio
import sys
import os
# --- 确保能导入同目录下的 xianyu 模块 ---
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 导入你的核心爬虫逻辑
from xianyu import run_crawler_core, load_config, CONFIG_FILE
class XianyuCrawlerApp:
def __init__(self, root):
self.root = root
self.root.title("闲鱼订单信息采集器 v1.0")
self.root.geometry("800x600") # 设置窗口大小
self.config = {}
self.crawler_thread = None
self.stop_event = threading.Event() # 用于将来可能的停止功能
self.create_widgets()
self.load_config_to_gui()
def create_widgets(self):
# --- Notebook (Tabbed Interface) ---
notebook = ttk.Notebook(self.root)
notebook.pack(fill='both', expand=True, padx=10, pady=10)
# --- Tab 1: Configuration ---
config_tab = ttk.Frame(notebook)
notebook.add(config_tab, text="配置")
# Create a canvas and scrollbar for the config tab
canvas = tk.Canvas(config_tab)
scrollbar = ttk.Scrollbar(config_tab, orient="vertical", command=canvas.yview)
scrollable_frame = ttk.Frame(canvas)
scrollable_frame.bind(
"<Configure>",
lambda e: canvas.configure(
scrollregion=canvas.bbox("all")
)
)
canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
# --- Configuration Fields inside scrollable_frame ---
row_counter = 0
# Output Directory
ttk.Label(scrollable_frame, text="输出目录:").grid(row=row_counter, column=0, sticky=tk.W, padx=5, pady=5)
self.output_dir_var = tk.StringVar()
output_dir_frame = ttk.Frame(scrollable_frame)
output_dir_frame.grid(row=row_counter, column=1, sticky=tk.W, padx=5, pady=5)
ttk.Entry(output_dir_frame, textvariable=self.output_dir_var, width=40).pack(side=tk.LEFT)
ttk.Button(output_dir_frame, text="浏览...", command=self.browse_output_dir).pack(side=tk.LEFT)
row_counter += 1
# Taobao Cookies Path
ttk.Label(scrollable_frame, text="淘宝 Cookie 文件路径:").grid(row=row_counter, column=0, sticky=tk.W, padx=5, pady=5)
self.cookies_path_var = tk.StringVar()
cookies_frame = ttk.Frame(scrollable_frame)
cookies_frame.grid(row=row_counter, column=1, sticky=tk.W, padx=5, pady=5)
ttk.Entry(cookies_frame, textvariable=self.cookies_path_var, width=40).pack(side=tk.LEFT)
ttk.Button(cookies_frame, text="浏览...", command=self.browse_cookies).pack(side=tk.LEFT)
row_counter += 1
# Login Method
ttk.Label(scrollable_frame, text="登录方式:").grid(row=row_counter, column=0, sticky=tk.W, padx=5, pady=5)
self.login_method_var = tk.StringVar()
login_method_combo = ttk.Combobox(scrollable_frame, textvariable=self.login_method_var, values=["scan_qr", "cookies"], state="readonly", width=38)
login_method_combo.grid(row=row_counter, column=1, sticky=tk.W, padx=5, pady=5)
row_counter += 1
# Headless Mode
self.headless_var = tk.BooleanVar()
ttk.Checkbutton(scrollable_frame, text="无头模式 (后台运行)", variable=self.headless_var).grid(row=row_counter, column=0, columnspan=2, sticky=tk.W, padx=5, pady=5)
row_counter += 1
# Max Pages
ttk.Label(scrollable_frame, text="最大采集页数 (None=全部):").grid(row=row_counter, column=0, sticky=tk.W, padx=5, pady=5)
self.max_pages_var = tk.StringVar()
ttk.Entry(scrollable_frame, textvariable=self.max_pages_var, width=40).grid(row=row_counter, column=1, sticky=tk.W, padx=5, pady=5)
row_counter += 1
# --- Scrape Shipping Filter ---
ttk.Label(scrollable_frame, text="--- 收货信息采集过滤 ---", font=('Arial', 10, 'bold')).grid(row=row_counter, column=0, columnspan=2, sticky=tk.W, padx=5, pady=(10,2))
row_counter += 1
self.filter_enabled_var = tk.BooleanVar()
ttk.Checkbutton(scrollable_frame, text="启用状态过滤", variable=self.filter_enabled_var).grid(row=row_counter, column=0, columnspan=2, sticky=tk.W, padx=5, pady=2)
row_counter += 1
# Status Options (Dynamic Checkbuttons based on common statuses)
self.status_vars = {}
common_statuses = ["交易成功", "买家已付款", "已发货", "已签收", "等待买家付款", "交易关闭"]
ttk.Label(scrollable_frame, text="选择要采集收货信息的订单状态:").grid(row=row_counter, column=0, columnspan=2, sticky=tk.W, padx=5, pady=2)
row_counter += 1
status_frame = ttk.Frame(scrollable_frame)
status_frame.grid(row=row_counter, column=0, columnspan=2, sticky=tk.W, padx=20, pady=5)
status_row, status_col = 0, 0
for status in common_statuses:
var = tk.BooleanVar()
self.status_vars[status] = var
chk = ttk.Checkbutton(status_frame, text=status, variable=var)
chk.grid(row=status_row, column=status_col, sticky=tk.W, padx=5, pady=2)
status_col += 1
if status_col > 2: # 3 columns per row
status_col = 0
status_row += 1
row_counter += 1
canvas.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
# --- Tab 2: Log ---
log_tab = ttk.Frame(notebook)
notebook.add(log_tab, text="日志")
self.log_text = scrolledtext.ScrolledText(log_tab, state='disabled', wrap='word')
self.log_text.pack(fill='both', expand=True, padx=5, pady=5)
# --- Control Buttons (outside tabs) ---
button_frame = ttk.Frame(self.root)
button_frame.pack(fill='x', padx=10, pady=(0, 10))
self.run_button = ttk.Button(button_frame, text="开始运行", command=self.start_crawling)
self.run_button.pack(side=tk.LEFT, padx=5)
self.save_config_button = ttk.Button(button_frame, text="保存配置", command=self.save_config_from_gui)
self.save_config_button.pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="退出", command=self.root.destroy).pack(side=tk.RIGHT, padx=5)
def load_config_to_gui(self):
"""从 config.json 加载配置到 GUI"""
try:
self.config = load_config(CONFIG_FILE)
except Exception as e:
messagebox.showwarning("加载配置", f"加载配置文件失败: {e}\n将使用默认值。")
self.config = {}
# Populate fields
self.output_dir_var.set(self.config.get("output_dir", "./output"))
self.cookies_path_var.set(self.config.get("taobao_cookies_path", "taobao_cookies.json"))
self.login_method_var.set(self.config.get("login_method", "scan_qr"))
self.headless_var.set(self.config.get("headless", False))
max_pages_val = self.config.get("max_pages")
if max_pages_val is None:
self.max_pages_var.set("")
else:
self.max_pages_var.set(str(max_pages_val))
filter_config = self.config.get("scrape_shipping_filter", {})
self.filter_enabled_var.set(filter_config.get("enabled", False))
status_options = filter_config.get("status_options", {})
# Reset all checkboxes first
for var in self.status_vars.values():
var.set(False)
# Set checkboxes based on config
for status, enabled in status_options.items():
if status in self.status_vars:
self.status_vars[status].set(enabled)
def save_config_from_gui(self):
"""从 GUI 获取配置并保存到 config.json"""
new_config = {
"output_dir": self.output_dir_var.get(),
"taobao_cookies_path": self.cookies_path_var.get(),
"login_method": self.login_method_var.get(),
"headless": self.headless_var.get(),
"max_pages": int(self.max_pages_var.get()) if self.max_pages_var.get().isdigit() else (None if self.max_pages_var.get().lower() in ('none', '') else self.max_pages_var.get()),
"scrape_shipping_filter": {
"enabled": self.filter_enabled_var.get(),
"status_options": {status: var.get() for status, var in self.status_vars.items()}
}
# Add other config options here if needed
}
try:
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(new_config, f, indent=4, ensure_ascii=False)
messagebox.showinfo("保存配置", "配置已成功保存到 config.json")
except Exception as e:
messagebox.showerror("保存配置", f"保存配置失败: {e}")
def browse_output_dir(self):
dir_path = filedialog.askdirectory(title="选择输出目录")
if dir_path:
self.output_dir_var.set(dir_path)
def browse_cookies(self):
file_path = filedialog.askopenfilename(title="选择 Cookie 文件", filetypes=[("JSON files", "*.json"), ("All files", "*.*")])
if file_path:
self.cookies_path_var.set(file_path)
def log_message(self, message):
"""向日志文本框添加消息"""
# Schedule the UI update on the main thread
self.root.after(0, self._update_log, message)
def _update_log(self, message):
"""在主线程中更新日志文本框"""
self.log_text.config(state='normal')
self.log_text.insert(tk.END, message + "\n")
self.log_text.see(tk.END) # Auto-scroll to bottom
self.log_text.config(state='disabled')
def start_crawling(self):
"""启动爬虫"""
self.save_config_from_gui() # 先保存当前配置
self.run_button.config(state='disabled', text="运行中...")
self.log_text.config(state='normal')
self.log_text.delete(1.0, tk.END) # Clear previous logs
self.log_text.config(state='disabled')
# Prepare config dict from GUI
config_dict = {
"output_dir": self.output_dir_var.get(),
"taobao_cookies_path": self.cookies_path_var.get(),
"login_method": self.login_method_var.get(),
"headless": self.headless_var.get(),
"max_pages": int(self.max_pages_var.get()) if self.max_pages_var.get().isdigit() else (None if self.max_pages_var.get().lower() in ('none', '') else self.max_pages_var.get()),
"scrape_shipping_filter": {
"enabled": self.filter_enabled_var.get(),
"status_options": {status: var.get() for status, var in self.status_vars.items()}
}
}
# Start crawler in a separate thread
self.crawler_thread = threading.Thread(target=self._run_crawler_async, args=(config_dict,), daemon=True)
self.crawler_thread.start()
def _run_crawler_async(self, config_dict):
"""在后台线程中运行异步爬虫"""
try:
# Run the asyncio event loop in this thread
asyncio.run(run_crawler_core(config_dict, log_callback=self.log_message))
except Exception as e:
self.log_message(f"[ERROR] 爬虫运行出错: {e}")
import traceback
self.log_message(traceback.format_exc())
finally:
# Re-enable the button on the main thread when done
self.root.after(0, self._on_crawler_finished)
def _on_crawler_finished(self):
"""爬虫结束后的回调"""
self.run_button.config(state='normal', text="开始运行")
messagebox.showinfo("完成", "爬虫任务已完成!请查看日志或输出的 CSV 文件。")
def main():
root = tk.Tk()
app = XianyuCrawlerApp(root)
root.mainloop()
if __name__ == "__main__":
main()
运行
python gui_app.py
- 使用 Conda 管理环境
- 为项目创建独立的 Conda 环境,避免依赖冲突。
创建环境
# 创建名为 my_project_env 的环境,指定 Python 版本
conda create -n my_project_env python=3.13
激活环境
conda activate my_project_env
安装依赖
# 安装 Playwright (会自动安装依赖如 pyee, greenlet)
pip install playwright
# Tkinter 通常是 Python 内置的,无需额外安装
# 可以通过 python -c "import tkinter; print(tkinter.TkVersion)" 验证
# 安装其他项目所需库
# pip install requests beautifulsoup4 ...
安装 Playwright 浏览器驱动
# 安装 Chromium 浏览器驱动
playwright install chromium
# 可选:安装其他浏览器
# playwright install firefox
# playwright install webkit
- Playwright 浏览器驱动详解
- Playwright 需要下载对应的浏览器驱动才能运行。
默认安装位置 (全局缓存)
macOS/Linux: ~/.cache/ms-playwright/
Windows: %LOCALAPPDATA%\ms-playwright\
安装命令:
playwright install chromium
会将驱动下载到该目录,例如 ~/.cache/ms-playwright/chromium-1194/。
环境变量 PLAYWRIGHT_BROWSERS_PATH
此环境变量控制浏览器驱动的查找位置。
未设置或设置为标准路径:
Playwright 在默认全局缓存目录查找驱动。
设置为 0:
Playwright 会在其自身的包目录下查找或安装驱动。
路径通常为:
<PYTHON_SITE_PACKAGES>/playwright/driver/package/.local-browsers/
用途: 常用于 PyInstaller 等打包工具,使生成的应用程序携带所需驱动,实现独立运行。
关键注意事项
路径一致性: PLAYWRIGHT_BROWSERS_PATH 的设置必须与浏览器驱动的实际存放位置匹配,否则会报 Executable doesn't exist 错误。
清理: 可以安全删除 ~/.cache/ms-playwright/ 中不用的浏览器版本以节省空间。
Headless Shell: chromium_headless_shell-xxxx 是专为无头模式优化的精简版驱动,性能可能更好,但只能用于 headless=True 场景。
- 创建 .command 文件以便双击运行
- .command 文件是一种在 macOS 上方便运行脚本的方式。
创建文件
在项目根目录(例如 my_project)下创建一个文本文件,命名为 启动我的应用.command。
编辑文件内容:
#!/bin/bash
# 获取脚本所在的目录,这样无论从哪里运行它都能找到正确的路径
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
echo "脚本所在目录: $SCRIPT_DIR"
# --- 配置区域 ---
# 请根据你的实际情况修改以下变量
# 1. 你的 Conda 环境名称
CONDA_ENV_NAME="xianyu_env"
# 2. 你的主 Python 脚本相对于此 .command 文件的路径
# 因为此 .command 文件和 gui_app.py 在同一目录,所以这里是 "."
PROJECT_RELATIVE_PATH="."
# --- 配置区域结束 ---
# 构建项目目录的完整路径
PROJECT_DIR="$SCRIPT_DIR/$PROJECT_RELATIVE_PATH"
# 检查项目目录是否存在
if [[ ! -d "$PROJECT_DIR" ]]; then
echo "❌ 错误: 项目目录 '$PROJECT_DIR' 不存在。请检查 PROJECT_RELATIVE_PATH 设置。"
# 等待用户按键再关闭窗口
read -p "按任意键退出..."
exit 1
fi
echo "📁 项目目录: $PROJECT_DIR"
# 初始化 Conda (根据你的 Conda 安装方式调整)
# 如果你使用的是 Miniconda 或 Anaconda 官方安装包,通常是下面这行:
eval "$($(which conda) shell.bash hook)"
# 检查 Conda 初始化是否成功
if [[ $? -ne 0 ]]; then
echo "❌ 错误: 无法初始化 Conda。请确保 Conda 已正确安装。"
read -p "按任意键退出..."
exit 1
fi
# 激活 Conda 环境
echo "🐍 激活 Conda 环境 '$CONDA_ENV_NAME'..."
conda activate "$CONDA_ENV_NAME"
# 检查环境激活是否成功
if [[ $? -ne 0 ]]; then
echo "❌ 错误: 无法激活 Conda 环境 '$CONDA_ENV_NAME'。请检查环境名称是否正确。"
read -p "按任意键退出..."
exit 1
fi
# 进入项目目录
echo "📂 切换到项目目录..."
cd "$PROJECT_DIR" || exit
# 运行 Python GUI 脚本
echo "🚀 启动 Python GUI 应用..."
python gui_app.py
# 检查 Python 脚本是否运行成功
if [[ $? -eq 0 ]]; then
echo "✅ 应用已退出。"
else
echo "⚠️ 应用可能遇到错误而退出。"
fi
# 等待用户按键再关闭终端窗口,方便查看输出
echo ""
echo "应用查看完毕。"
read -p "按任意键关闭此窗口..."
赋予执行权限 保存文件后,在终端中运行:
chmod +x 启动我的应用.command
运行 双击运行: 在 Finder 中双击 .command 文件。系统可能会弹窗警告,选择“打开”即可。 终端运行: 也可以在终端中直接执行 ./启动我的应用.command。
- 故障排除
Executable doesn't exist 错误:
检查 1: 确认 playwright install chromium 已成功执行,并且驱动存在于预期目录。
检查 2: 检查代码中是否设置了
os.environ['PLAYWRIGHT_BROWSERS_PATH'] = '0'。
# 如果设置了,驱动必须位于
<PYTHON_SITE_PACKAGES>/playwright/driver/package/.local-browsers/;
# 否则应在
~/.cache/ms-playwright/。
Conda 环境未激活: 确保 .command 文件中的 eval "$(/opt/miniconda3/bin/conda shell.zsh hook)" 路径正确指向你的 Conda 初始化脚本 (conda.sh 或 __conda_setup 命令所在位置)。
tkinter 模块未找到:
确认 Python 环境正确,并且 Tkinter 是随 Python 一起安装的标准库。可以使用 python -c "import tkinter; print(tkinter.TkVersion)" 测试。
- 总结
通过结合 Tkinter、Conda 和 .command 文件,可以构建一个易于分发和运行的 Python GUI 应用。理解并正确配置 Playwright 的浏览器驱动路径是保证应用稳定运行的关键一步。
🎁 结语
这份脚本是一个 完整的、可复用的模板,你可以基于此扩展更多功能,如:
- 自动化登录(扫码)
- 多店铺支持
- 数据入库(MySQL/SQLite)
- 微信通知
- 图形化界面*
希望这份文档对你有帮助!祝你学习顺利!📘