简体中文
淘宝订单信息采集器 Python 脚本

📦 淘宝订单信息采集器(Playwright 版 Python 脚本)

大纲

这份文档将包括:

  1. 项目背景与需求
  2. 核心思路与技术要点
  3. 完整的、带详细注释的代码
  4. 使用说明

适用于:已登录淘宝卖家后台,需批量采集“买家已付款”状态订单的真实收货人信息。


1. 🎯 项目背景与需求

📌 核心需求

从淘宝卖家后台的“已卖出的宝贝”页面,自动化采集指定状态订单的 真实买家收货信息(收货人、手机号、收货地址)。 🎯 为什么需要自动化?

  • 手动点击每个订单查看收货信息效率极低。
  • 淘宝页面结构复杂,数据动态加载,需处理 iframe 和异步 API。
  • 需要结合订单基础信息(订单号、商品、金额等)与收货信息,导出为结构化数据(如 CSV)。

✅ 最终目标

  • ✅ 自动登录(依赖外部 Cookie)
  • ✅ 爬取订单列表页的基础信息
  • 精准过滤:仅处理“买家已付款”的订单
  • ✅ 进入物流页,通过 iframe 获取 orderid
  • 调用内部 API 获取真实收货信息(避免被页面显示的“加密地址”欺骗)
  • 失败备用方案:DOM 解析
  • ✅ 导出完整数据到 CSV

2. 🧠 核心思路与技术要点

技术点说明
Playwright无头浏览器,模拟真实用户操作,处理 JS 渲染和登录态。
Cookie 登录避免手动扫码,通过加载已保存的 Cookie 保持登录。
iframe 处理物流页是主页面嵌套 iframe,必须通过 content_frame() 进入上下文。
orderid 提取真实物流 ID 隐藏在 input[scene="consign"][orderid] 元素中,需从 iframe 内提取。
API 调用使用 page.request 直接调用 getRealReceiver 接口,获取未加密的真实地址。
DOM 备用解析当 API 失败时,从 iframe 的文本中正则提取手机号、姓名。
状态过滤只处理“买家已付款”的订单,跳过“已发货”、“交易关闭”等状态。

3. 📄 完整代码(带详细注释)

import asyncio
import re
import csv
from datetime import datetime
from pathlib import Path
import logging
from typing import Dict, List
import random
import time

from playwright.async_api import async_playwright, Page, Frame, Browser, Response

# ============================
# 🛠 配置区
# ============================

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('scraper.log', encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# 脚本配置
CONFIG = {
    "base_url": "https://seller.taobao.com",  # 淘宝卖家后台
    "cookie_file": "taobao_cookies.json",     # 保存的 Cookie 文件
    "output_dir": "output",                   # 输出目录
    "max_pages": 1,                           # 最大采集页数(用于测试)
    "test_mode": True,                        # 测试模式:只采1页
    "fetch_shipping": True,                   # 是否采集收货信息
    "status_filter": ["买家已付款"]           # 只采集这些状态的订单
}

# ============================
# 🧩 工具函数
# ============================

async def random_delay(min_sec: float, max_sec: float, action: str = ""):
    """随机延迟,模拟人类操作"""
    delay = random.uniform(min_sec, max_sec)
    if action:
        logger.info(f"⏳ [{action}] 随机延迟 {delay:.2f} 秒...")
    else:
        logger.debug(f"⏳ 随机延迟 {delay:.2f} 秒...")
    await asyncio.sleep(delay)

def setup_output_dir() -> Path:
    """创建输出目录"""
    output_path = Path(CONFIG["output_dir"])
    output_path.mkdir(exist_ok=True)
    return output_path

def load_cookies(page: Page) -> bool:
    """加载本地 Cookie 以保持登录状态"""
    cookie_file = Path(CONFIG["cookie_file"])
    if not cookie_file.exists():
        logger.error(f"❌ Cookie 文件不存在: {cookie_file}")
        return False

    try:
        with open(cookie_file, 'r', encoding='utf-8') as f:
            cookies = f.read()
            import json
            page.context.add_cookies(json.loads(cookies))
        logger.info("✅ Cookie 加载成功,尝试保持登录状态...")
        return True
    except Exception as e:
        logger.error(f"❌ 加载 Cookie 失败: {e}")
        return False

# ============================
# 🕷 核心采集逻辑
# ============================

class TaobaoOrderScraper:
    def __init__(self, page: Page):
        self.page = page
        self.orders: List[Dict] = []

    async def scrape_orders(self):
        """主采集流程"""
        logger.info("🚀 开始淘宝订单采集...")

        # 1. 前往订单页并加载 Cookie
        await self.page.goto(f"{CONFIG['base_url']}/order/sold.htm", wait_until='networkidle')
        if not await load_cookies(self.page):
            logger.critical("请先登录并保存 Cookie!")
            return

        await self.page.reload(wait_until='networkidle')
        await random_delay(3, 5, "页面加载后")

        # 2. 开始翻页采集
        for page_num in range(1, CONFIG["max_pages"] + 1):
            logger.info(f"📊 正在采集第 {page_num} 页...")

            # 提取当前页所有订单
            await self._extract_orders_from_page()

            # 测试模式只采1页
            if CONFIG["test_mode"]:
                logger.info("🧪 达到最大测试页数 1,停止采集。")
                break

            # 尝试翻页
            if not await self._go_to_next_page():
                logger.info("🔚 已到最后一页。")
                break

            await random_delay(3, 5, "翻页后")

        logger.info(f"📊 基础信息采集完成,共 {len(self.orders)} 条订单。")

    async def _extract_orders_from_page(self):
        """从当前页面提取所有订单基础信息"""
        # 等待订单列表加载
        await self.page.wait_for_selector('label[itemprop="order"]', timeout=30000)

        # 获取所有订单块
        order_blocks = await self.page.locator('label[itemprop="order"]').all()
        logger.info(f"🔍 在当前页找到 {len(order_blocks)} 条订单。")

        for idx, block in enumerate(order_blocks, 1):
            try:
                order_data = await self._extract_single_order(block)
                if order_data:
                    self.orders.append(order_data)
                    logger.debug(f"📌 添加有效订单 #{len(self.orders)}: {order_data['订单号']}")
            except Exception as e:
                logger.error(f"❌ 解析第 {idx} 个订单失败: {e}")

    async def _extract_single_order(self, block: Page) -> Dict:
        """提取单个订单的基础信息"""
        try:
            # 提取订单号和创建时间
            id_time_text = await block.locator('label[itemprop="order"] .item-mod__checkbox-label___cRGUj').inner_text()
            order_id_match = re.search(r'订单号:\s*(\d+)', id_time_text)
            create_time_match = re.search(r'创建时间:\s*([\d\-:\s]+)', id_time_text)

            if not order_id_match:
                return None

            order_id = order_id_match.group(1)
            create_time = create_time_match.group(1) if create_time_match else "未知"

            # 提取商品名
            product_name = await block.locator('.production-mod__production___3ZePJ a[target="_blank"] span[style*="line-height"]').first.inner_text()
            product_name = product_name.strip()

            # 提取数量
            quantity = await block.locator('.suborder-mod__production___1eyM1 + td div p').first.inner_text()
            quantity = quantity.strip()

            # 提取交易状态
            status = await block.locator('.sol-mod__no-br___toLPG + td p span').first.inner_text()
            status = status.strip()

            # 提取实收款
            amount = await block.locator('.price-mod__price___3Un7c strong').first.inner_text()
            amount = f"¥{amount}"

            # 构建订单数据
            order_data = {
                "订单号": order_id,
                "创建时间": create_time,
                "宝贝": product_name,
                "数量": quantity,
                "交易状态": status,
                "实收款": amount
            }

            return order_data

        except Exception as e:
            logger.error(f"❌ 提取订单信息失败: {e}")
            return None

    async def _go_to_next_page(self) -> bool:
        """翻页"""
        next_btn = self.page.locator('a[title="下一页"]')
        if await next_btn.is_enabled():
            await next_btn.click()
            await self.page.wait_for_load_state('networkidle')
            return True
        return False

# ============================
# 📦 收货信息采集模块
# ============================

async def extract_shipping_info(page: Page, trade_id: str) -> Dict:
    """
    为单个订单采集收货信息
    :param page: Playwright 页面对象
    :param trade_id: 订单号(tradeId)
    :return: 包含收货人、手机号、收货地址的字典
    """
    logger.debug(f"🚚 开始为订单 {trade_id} 提取收货信息...")
    shipping_data = {"收货人": "未知", "手机号": "未知", "收货地址": "未知"}

    tracking_url = f"https://wuliu2.taobao.com/user/n/consign2.htm?tradeId={trade_id}"

    try:
        await page.goto(tracking_url, wait_until='networkidle', timeout=30000)
        await random_delay(2, 4, "物流页加载")

        # 检查是否登录
        if "login.taobao.com" in page.url:
            logger.error("🚨 检测到跳转到登录页,请检查 Cookie 是否有效")
            shipping_data.update({"收货人": "登录失效", "手机号": "登录失效", "收货地址": "登录失效"})
            return shipping_data

        # 【调试】打印所有 iframe
        iframes = page.locator('iframe')
        count = await iframes.count()
        logger.debug(f"🔍 检测到 {count} 个 iframe:")
        for i in range(count):
            src = await iframes.nth(i).get_attribute('src')
            logger.debug(f"   - iframe[{i}] src: {src}")

        # 等待并进入 iframe
        logger.info("📦 等待发货 iframe 加载...")
        iframe_element = await page.wait_for_selector('iframe[src*="consign2.htm"]', timeout=20000)
        iframe = await iframe_element.content_frame()
        if not iframe:
            logger.error("❌ 无法获取 iframe 的 content_frame()")
            return shipping_data

        logger.info("✅ 成功进入 iframe 上下文")

        # 从 iframe 中提取 orderid (真实物流ID)
        input_locator = iframe.locator('input[scene="consign"][orderid]')
        await input_locator.wait_for(state='visible', timeout=20000)
        logistics_id = await input_locator.get_attribute('orderid')

        if not logistics_id:
            logger.error("❌ 从 input 元素中提取 orderid 失败")
            return shipping_data

        logger.info(f"✅ 成功从 iframe 中提取到 logisticsId: {logistics_id}")

        # === 调用 API 获取真实信息 ===
        api_url = f"https://wuliu2.taobao.com/user/getRealReceiver?orderId={logistics_id}"
        last_error = None

        for attempt in range(3):
            try:
                logger.debug(f"📡 正在请求 API (第 {attempt+1} 次): {api_url}")
                response: Response = await page.request.get(api_url, timeout=10000)

                if response.status == 200:
                    data = await response.json()
                    logger.debug(f"📥 API 响应: {data}")

                    if data.get("success") and data.get("data"):
                        raw_addr = data["data"].get("address", "")
                        if raw_addr:
                            # 解析 "地址, 姓名, 手机号"
                            parts = [p.strip() for p in raw_addr.split(',') if p.strip()]
                            if len(parts) >= 3:
                                phone = parts[-1]
                                name = parts[-2]
                                address = ','.join(parts[:-2])
                                shipping_data.update({
                                    "收货人": name,
                                    "手机号": phone,
                                    "收货地址": address
                                })
                                logger.info(f"✅ 【API】成功获取: {name}, {phone}")
                                return shipping_data
                            else:
                                logger.warning(f"⚠️ API 地址格式异常: '{raw_addr}'")
                        else:
                            logger.error("❌ API 返回 success=true 但 address 为空")
                    else:
                        msg = data.get("msg", "未知错误")
                        logger.error(f"❌ API 返回失败: {msg}")
                        last_error = msg
                else:
                    logger.error(f"❌ API HTTP {response.status}")
                    last_error = f"HTTP {response.status}"
                break  # 成功响应后跳出

            except Exception as e:
                last_error = str(e)
                logger.debug(f"❌ 第 {attempt+1} 次 API 请求失败: {e}")
                if attempt < 2:
                    await random_delay(5, 8, "API 请求失败后重试前")
                else:
                    logger.error(f"❌ API 请求失败,已重试 3 次,最终错误: {last_error}")

        # === 备用方案:从 iframe DOM 解析 ===
        try:
            logger.info("🔍 启动 DOM 备用解析...")
            # 查找包含手机号的文本
            phone_locator = iframe.locator('text=/\\d{11}/')
            await phone_locator.first.wait_for_element_state('visible', timeout=15000)

            full_text = re.sub(r'\s+', ' ', await phone_locator.first.inner_text()).strip()
            logger.debug(f"📄 DOM 文本内容: {full_text}")

            # 提取手机号
            phone_match = re.search(r'(\d{11})', full_text)
            if not phone_match:
                raise Exception("未找到手机号")

            phone = phone_match.group(1)
            # 提取姓名(手机号前最后一个非数字部分)
            before_phone = full_text[:phone_match.start()].strip()
            for kw in ['收货人', '姓名', '电话', '手机', '联系人']:
                before_phone = re.sub(rf'{kw}\s*[::]?\s*', '', before_phone)
            name_parts = re.split(r'[,,\s]+', before_phone.strip())
            name = name_parts[-1] if name_parts else "未知"

            # 地址是其余部分
            address_end = full_text.rfind(name)
            address = full_text[:address_end].strip()

            shipping_data.update({
                "收货人": name,
                "手机号": phone,
                "收货地址": address
            })
            logger.info(f"✅ 【DOM】解析成功: {name}, {phone}")

        except Exception as e:
            logger.error(f"💥 DOM 备用解析失败: {e}")

        return shipping_data

    except Exception as e:
        logger.error(f"❌ 提取收货信息失败: {e}")
        logger.exception(e)
        return shipping_data

# ============================
# 💾 数据导出
# ============================

def save_to_csv(orders: List[Dict], output_dir: Path):
    """将订单数据保存为 CSV"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = output_dir / f"orders_sold_{timestamp}.csv"

    # 确保字段顺序
    fieldnames = ["订单号", "创建时间", "宝贝", "数量", "交易状态", "实收款", "收货人", "手机号", "收货地址"]

    with open(filename, 'w', encoding='utf-8-sig', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(orders)

    logger.info(f"💾 ✅ 数据保存成功! 文件: {filename}")

# ============================
# 🏁 主函数
# ============================

async def main():
    async with async_playwright() as p:
        browser: Browser = await p.chromium.launch(headless=False)  # headless=False 便于调试
        context = await browser.new_context(
            viewport={'width': 1280, 'height': 800},
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
        )
        page: Page = await context.new_page()

        try:
            # 1. 初始化采集器
            scraper = TaobaoOrderScraper(page)
            await scraper.scrape_orders()

            # 2. 为符合条件的订单采集收货信息
            if CONFIG["fetch_shipping"] and scraper.orders:
                logger.info("📦 开始为每条订单采集【买家收货信息】...")
                output_dir = setup_output_dir()

                for i, order in enumerate(scraper.orders, 1):
                    status = order.get("交易状态", "")

                    if status not in CONFIG["status_filter"]:
                        logger.info(f"({i}/{len(scraper.orders)}) ⚠️ 跳过订单 {order['订单号']},状态为 '{status}' (不在采集列表中)")
                        continue

                    logger.info(f"({i}/{len(scraper.orders)}) 为订单 {order['订单号']} 采集收货信息...")
                    shipping_info = await extract_shipping_info(page, order['订单号'])
                    order.update(shipping_info)  # 合并收货信息
                    await random_delay(5, 10, "处理完订单后")

            # 3. 保存数据
            save_to_csv(scraper.orders, output_dir)

        except Exception as e:
            logger.critical(f"❌ 程序运行出错: {e}")
            logger.exception(e)
        finally:
            await browser.close()
            logger.info("🧹 ✅ 浏览器已关闭。")
            logger.info("🏁 程序运行结束")

if __name__ == "__main__":
    asyncio.run(main())

4. 📖 使用说明

1️⃣ 准备工作

  • 安装依赖:

    pip install playwright
    playwright install
    
  • 手动登录并保存 Cookie

    1. 运行一个临时脚本,打开淘宝卖家后台。
    2. 手动登录。
    3. 执行 await page.context.storage_state(path='taobao_cookies.json') 保存 Cookie。

2️⃣ 配置 CONFIG

修改脚本顶部的 CONFIG 字典:

  • max_pages: 设置要采集的页数。
  • status_filter: 修改为你想采集的订单状态。
  • test_mode: 测试时设为 True

3️⃣ 运行脚本

python taobao_scraper.py

4️⃣ 查看结果

  • 日志:scraper.log
  • 数据:output/orders_sold_*.csv

5.(图形化界面)及脚本方式运行

  1. 创建图形化界面 (Tkinter)
  • Tkinter 是 Python 的标准 GUI 库,易于上手。

基本结构

# gui_app.py

import tkinter as tk
from tkinter import ttk, messagebox, filedialog, scrolledtext
import json
import threading
import asyncio
import sys
import os

# --- 确保能导入同目录下的 xianyu 模块 ---
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

# 导入你的核心爬虫逻辑
from xianyu import run_crawler_core, load_config, CONFIG_FILE

class XianyuCrawlerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("闲鱼订单信息采集器 v1.0")
        self.root.geometry("800x600") # 设置窗口大小
        self.config = {}
        self.crawler_thread = None
        self.stop_event = threading.Event() # 用于将来可能的停止功能

        self.create_widgets()
        self.load_config_to_gui()

    def create_widgets(self):
        # --- Notebook (Tabbed Interface) ---
        notebook = ttk.Notebook(self.root)
        notebook.pack(fill='both', expand=True, padx=10, pady=10)

        # --- Tab 1: Configuration ---
        config_tab = ttk.Frame(notebook)
        notebook.add(config_tab, text="配置")

        # Create a canvas and scrollbar for the config tab
        canvas = tk.Canvas(config_tab)
        scrollbar = ttk.Scrollbar(config_tab, orient="vertical", command=canvas.yview)
        scrollable_frame = ttk.Frame(canvas)

        scrollable_frame.bind(
            "<Configure>",
            lambda e: canvas.configure(
                scrollregion=canvas.bbox("all")
            )
        )

        canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
        canvas.configure(yscrollcommand=scrollbar.set)

        # --- Configuration Fields inside scrollable_frame ---
        row_counter = 0

        # Output Directory
        ttk.Label(scrollable_frame, text="输出目录:").grid(row=row_counter, column=0, sticky=tk.W, padx=5, pady=5)
        self.output_dir_var = tk.StringVar()
        output_dir_frame = ttk.Frame(scrollable_frame)
        output_dir_frame.grid(row=row_counter, column=1, sticky=tk.W, padx=5, pady=5)
        ttk.Entry(output_dir_frame, textvariable=self.output_dir_var, width=40).pack(side=tk.LEFT)
        ttk.Button(output_dir_frame, text="浏览...", command=self.browse_output_dir).pack(side=tk.LEFT)
        row_counter += 1

        # Taobao Cookies Path
        ttk.Label(scrollable_frame, text="淘宝 Cookie 文件路径:").grid(row=row_counter, column=0, sticky=tk.W, padx=5, pady=5)
        self.cookies_path_var = tk.StringVar()
        cookies_frame = ttk.Frame(scrollable_frame)
        cookies_frame.grid(row=row_counter, column=1, sticky=tk.W, padx=5, pady=5)
        ttk.Entry(cookies_frame, textvariable=self.cookies_path_var, width=40).pack(side=tk.LEFT)
        ttk.Button(cookies_frame, text="浏览...", command=self.browse_cookies).pack(side=tk.LEFT)
        row_counter += 1

        # Login Method
        ttk.Label(scrollable_frame, text="登录方式:").grid(row=row_counter, column=0, sticky=tk.W, padx=5, pady=5)
        self.login_method_var = tk.StringVar()
        login_method_combo = ttk.Combobox(scrollable_frame, textvariable=self.login_method_var, values=["scan_qr", "cookies"], state="readonly", width=38)
        login_method_combo.grid(row=row_counter, column=1, sticky=tk.W, padx=5, pady=5)
        row_counter += 1

        # Headless Mode
        self.headless_var = tk.BooleanVar()
        ttk.Checkbutton(scrollable_frame, text="无头模式 (后台运行)", variable=self.headless_var).grid(row=row_counter, column=0, columnspan=2, sticky=tk.W, padx=5, pady=5)
        row_counter += 1

        # Max Pages
        ttk.Label(scrollable_frame, text="最大采集页数 (None=全部):").grid(row=row_counter, column=0, sticky=tk.W, padx=5, pady=5)
        self.max_pages_var = tk.StringVar()
        ttk.Entry(scrollable_frame, textvariable=self.max_pages_var, width=40).grid(row=row_counter, column=1, sticky=tk.W, padx=5, pady=5)
        row_counter += 1

        # --- Scrape Shipping Filter ---
        ttk.Label(scrollable_frame, text="--- 收货信息采集过滤 ---", font=('Arial', 10, 'bold')).grid(row=row_counter, column=0, columnspan=2, sticky=tk.W, padx=5, pady=(10,2))
        row_counter += 1

        self.filter_enabled_var = tk.BooleanVar()
        ttk.Checkbutton(scrollable_frame, text="启用状态过滤", variable=self.filter_enabled_var).grid(row=row_counter, column=0, columnspan=2, sticky=tk.W, padx=5, pady=2)
        row_counter += 1

        # Status Options (Dynamic Checkbuttons based on common statuses)
        self.status_vars = {}
        common_statuses = ["交易成功", "买家已付款", "已发货", "已签收", "等待买家付款", "交易关闭"]
        ttk.Label(scrollable_frame, text="选择要采集收货信息的订单状态:").grid(row=row_counter, column=0, columnspan=2, sticky=tk.W, padx=5, pady=2)
        row_counter += 1

        status_frame = ttk.Frame(scrollable_frame)
        status_frame.grid(row=row_counter, column=0, columnspan=2, sticky=tk.W, padx=20, pady=5)
        status_row, status_col = 0, 0
        for status in common_statuses:
            var = tk.BooleanVar()
            self.status_vars[status] = var
            chk = ttk.Checkbutton(status_frame, text=status, variable=var)
            chk.grid(row=status_row, column=status_col, sticky=tk.W, padx=5, pady=2)
            status_col += 1
            if status_col > 2: # 3 columns per row
                status_col = 0
                status_row += 1
        row_counter += 1

        canvas.pack(side="left", fill="both", expand=True)
        scrollbar.pack(side="right", fill="y")

        # --- Tab 2: Log ---
        log_tab = ttk.Frame(notebook)
        notebook.add(log_tab, text="日志")

        self.log_text = scrolledtext.ScrolledText(log_tab, state='disabled', wrap='word')
        self.log_text.pack(fill='both', expand=True, padx=5, pady=5)

        # --- Control Buttons (outside tabs) ---
        button_frame = ttk.Frame(self.root)
        button_frame.pack(fill='x', padx=10, pady=(0, 10))

        self.run_button = ttk.Button(button_frame, text="开始运行", command=self.start_crawling)
        self.run_button.pack(side=tk.LEFT, padx=5)

        self.save_config_button = ttk.Button(button_frame, text="保存配置", command=self.save_config_from_gui)
        self.save_config_button.pack(side=tk.LEFT, padx=5)

        ttk.Button(button_frame, text="退出", command=self.root.destroy).pack(side=tk.RIGHT, padx=5)

    def load_config_to_gui(self):
        """从 config.json 加载配置到 GUI"""
        try:
            self.config = load_config(CONFIG_FILE)
        except Exception as e:
            messagebox.showwarning("加载配置", f"加载配置文件失败: {e}\n将使用默认值。")
            self.config = {}

        # Populate fields
        self.output_dir_var.set(self.config.get("output_dir", "./output"))
        self.cookies_path_var.set(self.config.get("taobao_cookies_path", "taobao_cookies.json"))
        self.login_method_var.set(self.config.get("login_method", "scan_qr"))
        self.headless_var.set(self.config.get("headless", False))

        max_pages_val = self.config.get("max_pages")
        if max_pages_val is None:
             self.max_pages_var.set("")
        else:
             self.max_pages_var.set(str(max_pages_val))

        filter_config = self.config.get("scrape_shipping_filter", {})
        self.filter_enabled_var.set(filter_config.get("enabled", False))
        status_options = filter_config.get("status_options", {})
        # Reset all checkboxes first
        for var in self.status_vars.values():
            var.set(False)
        # Set checkboxes based on config
        for status, enabled in status_options.items():
            if status in self.status_vars:
                self.status_vars[status].set(enabled)

    def save_config_from_gui(self):
        """从 GUI 获取配置并保存到 config.json"""
        new_config = {
            "output_dir": self.output_dir_var.get(),
            "taobao_cookies_path": self.cookies_path_var.get(),
            "login_method": self.login_method_var.get(),
            "headless": self.headless_var.get(),
            "max_pages": int(self.max_pages_var.get()) if self.max_pages_var.get().isdigit() else (None if self.max_pages_var.get().lower() in ('none', '') else self.max_pages_var.get()),
            "scrape_shipping_filter": {
                "enabled": self.filter_enabled_var.get(),
                "status_options": {status: var.get() for status, var in self.status_vars.items()}
            }
            # Add other config options here if needed
        }
        try:
            with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
                json.dump(new_config, f, indent=4, ensure_ascii=False)
            messagebox.showinfo("保存配置", "配置已成功保存到 config.json")
        except Exception as e:
             messagebox.showerror("保存配置", f"保存配置失败: {e}")

    def browse_output_dir(self):
        dir_path = filedialog.askdirectory(title="选择输出目录")
        if dir_path:
            self.output_dir_var.set(dir_path)

    def browse_cookies(self):
        file_path = filedialog.askopenfilename(title="选择 Cookie 文件", filetypes=[("JSON files", "*.json"), ("All files", "*.*")])
        if file_path:
            self.cookies_path_var.set(file_path)

    def log_message(self, message):
        """向日志文本框添加消息"""
        # Schedule the UI update on the main thread
        self.root.after(0, self._update_log, message)

    def _update_log(self, message):
        """在主线程中更新日志文本框"""
        self.log_text.config(state='normal')
        self.log_text.insert(tk.END, message + "\n")
        self.log_text.see(tk.END) # Auto-scroll to bottom
        self.log_text.config(state='disabled')

    def start_crawling(self):
        """启动爬虫"""
        self.save_config_from_gui() # 先保存当前配置
        self.run_button.config(state='disabled', text="运行中...")
        self.log_text.config(state='normal')
        self.log_text.delete(1.0, tk.END) # Clear previous logs
        self.log_text.config(state='disabled')

        # Prepare config dict from GUI
        config_dict = {
            "output_dir": self.output_dir_var.get(),
            "taobao_cookies_path": self.cookies_path_var.get(),
            "login_method": self.login_method_var.get(),
            "headless": self.headless_var.get(),
            "max_pages": int(self.max_pages_var.get()) if self.max_pages_var.get().isdigit() else (None if self.max_pages_var.get().lower() in ('none', '') else self.max_pages_var.get()),
            "scrape_shipping_filter": {
                "enabled": self.filter_enabled_var.get(),
                "status_options": {status: var.get() for status, var in self.status_vars.items()}
            }
        }

        # Start crawler in a separate thread
        self.crawler_thread = threading.Thread(target=self._run_crawler_async, args=(config_dict,), daemon=True)
        self.crawler_thread.start()

    def _run_crawler_async(self, config_dict):
        """在后台线程中运行异步爬虫"""
        try:
            # Run the asyncio event loop in this thread
            asyncio.run(run_crawler_core(config_dict, log_callback=self.log_message))
        except Exception as e:
            self.log_message(f"[ERROR] 爬虫运行出错: {e}")
            import traceback
            self.log_message(traceback.format_exc())
        finally:
            # Re-enable the button on the main thread when done
            self.root.after(0, self._on_crawler_finished)

    def _on_crawler_finished(self):
        """爬虫结束后的回调"""
        self.run_button.config(state='normal', text="开始运行")
        messagebox.showinfo("完成", "爬虫任务已完成!请查看日志或输出的 CSV 文件。")


def main():
    root = tk.Tk()
    app = XianyuCrawlerApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

运行

python gui_app.py
  1. 使用 Conda 管理环境
  • 为项目创建独立的 Conda 环境,避免依赖冲突。

创建环境

# 创建名为 my_project_env 的环境,指定 Python 版本
conda create -n my_project_env python=3.13

激活环境

conda activate my_project_env

安装依赖

# 安装 Playwright (会自动安装依赖如 pyee, greenlet)
pip install playwright

# Tkinter 通常是 Python 内置的,无需额外安装
# 可以通过 python -c "import tkinter; print(tkinter.TkVersion)" 验证

# 安装其他项目所需库
# pip install requests beautifulsoup4 ...

安装 Playwright 浏览器驱动

# 安装 Chromium 浏览器驱动
playwright install chromium
# 可选:安装其他浏览器
# playwright install firefox
# playwright install webkit
  1. Playwright 浏览器驱动详解
  • Playwright 需要下载对应的浏览器驱动才能运行。

默认安装位置 (全局缓存)
macOS/Linux: ~/.cache/ms-playwright/
Windows: %LOCALAPPDATA%\ms-playwright\

安装命令:

playwright install chromium

会将驱动下载到该目录,例如 ~/.cache/ms-playwright/chromium-1194/。
环境变量 PLAYWRIGHT_BROWSERS_PATH
此环境变量控制浏览器驱动的查找位置。

未设置或设置为标准路径:
Playwright 在默认全局缓存目录查找驱动。
设置为 0:
Playwright 会在其自身的包目录下查找或安装驱动。
路径通常为:

<PYTHON_SITE_PACKAGES>/playwright/driver/package/.local-browsers/

用途: 常用于 PyInstaller 等打包工具,使生成的应用程序携带所需驱动,实现独立运行。
关键注意事项
路径一致性: PLAYWRIGHT_BROWSERS_PATH 的设置必须与浏览器驱动的实际存放位置匹配,否则会报 Executable doesn't exist 错误。
清理: 可以安全删除 ~/.cache/ms-playwright/ 中不用的浏览器版本以节省空间。
Headless Shell: chromium_headless_shell-xxxx 是专为无头模式优化的精简版驱动,性能可能更好,但只能用于 headless=True 场景。

  1. 创建 .command 文件以便双击运行
  • .command 文件是一种在 macOS 上方便运行脚本的方式。

创建文件
在项目根目录(例如 my_project)下创建一个文本文件,命名为 启动我的应用.command。
编辑文件内容:

#!/bin/bash

# 获取脚本所在的目录,这样无论从哪里运行它都能找到正确的路径
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
echo "脚本所在目录: $SCRIPT_DIR"

# --- 配置区域 ---
# 请根据你的实际情况修改以下变量

# 1. 你的 Conda 环境名称
CONDA_ENV_NAME="xianyu_env"

# 2. 你的主 Python 脚本相对于此 .command 文件的路径
#    因为此 .command 文件和 gui_app.py 在同一目录,所以这里是 "."
PROJECT_RELATIVE_PATH="."

# --- 配置区域结束 ---

# 构建项目目录的完整路径
PROJECT_DIR="$SCRIPT_DIR/$PROJECT_RELATIVE_PATH"

# 检查项目目录是否存在
if [[ ! -d "$PROJECT_DIR" ]]; then
  echo "❌ 错误: 项目目录 '$PROJECT_DIR' 不存在。请检查 PROJECT_RELATIVE_PATH 设置。"
  # 等待用户按键再关闭窗口
  read -p "按任意键退出..."
  exit 1
fi

echo "📁 项目目录: $PROJECT_DIR"

# 初始化 Conda (根据你的 Conda 安装方式调整)
# 如果你使用的是 Miniconda 或 Anaconda 官方安装包,通常是下面这行:
eval "$($(which conda) shell.bash hook)"

# 检查 Conda 初始化是否成功
if [[ $? -ne 0 ]]; then
  echo "❌ 错误: 无法初始化 Conda。请确保 Conda 已正确安装。"
  read -p "按任意键退出..."
  exit 1
fi

# 激活 Conda 环境
echo "🐍 激活 Conda 环境 '$CONDA_ENV_NAME'..."
conda activate "$CONDA_ENV_NAME"

# 检查环境激活是否成功
if [[ $? -ne 0 ]]; then
  echo "❌ 错误: 无法激活 Conda 环境 '$CONDA_ENV_NAME'。请检查环境名称是否正确。"
  read -p "按任意键退出..."
  exit 1
fi

# 进入项目目录
echo "📂 切换到项目目录..."
cd "$PROJECT_DIR" || exit

# 运行 Python GUI 脚本
echo "🚀 启动 Python GUI 应用..."
python gui_app.py

# 检查 Python 脚本是否运行成功
if [[ $? -eq 0 ]]; then
  echo "✅ 应用已退出。"
else
  echo "⚠️ 应用可能遇到错误而退出。"
fi

# 等待用户按键再关闭终端窗口,方便查看输出
echo ""
echo "应用查看完毕。"
read -p "按任意键关闭此窗口..."

赋予执行权限 保存文件后,在终端中运行:

chmod +x 启动我的应用.command

运行 双击运行: 在 Finder 中双击 .command 文件。系统可能会弹窗警告,选择“打开”即可。 终端运行: 也可以在终端中直接执行 ./启动我的应用.command。

  1. 故障排除

Executable doesn't exist 错误:
检查 1: 确认 playwright install chromium 已成功执行,并且驱动存在于预期目录。
检查 2: 检查代码中是否设置了

os.environ['PLAYWRIGHT_BROWSERS_PATH'] = '0'# 如果设置了,驱动必须位于
<PYTHON_SITE_PACKAGES>/playwright/driver/package/.local-browsers/;
# 否则应在
~/.cache/ms-playwright/。

Conda 环境未激活: 确保 .command 文件中的 eval "$(/opt/miniconda3/bin/conda shell.zsh hook)" 路径正确指向你的 Conda 初始化脚本 (conda.sh 或 __conda_setup 命令所在位置)。

tkinter 模块未找到:

确认 Python 环境正确,并且 Tkinter 是随 Python 一起安装的标准库。可以使用 python -c "import tkinter; print(tkinter.TkVersion)" 测试。

  1. 总结

通过结合 Tkinter、Conda 和 .command 文件,可以构建一个易于分发和运行的 Python GUI 应用。理解并正确配置 Playwright 的浏览器驱动路径是保证应用稳定运行的关键一步。

🎁 结语

这份脚本是一个 完整的、可复用的模板,你可以基于此扩展更多功能,如:

  • 自动化登录(扫码)
  • 多店铺支持
  • 数据入库(MySQL/SQLite)
  • 微信通知
  • 图形化界面*

希望这份文档对你有帮助!祝你学习顺利!📘

文章在技术分类中;
0
0
0
0