Python爬虫从入门到精通教程

Python爬虫从入门到精通教程

💡 学习建议:本教程采用循序渐进的方式,建议按照章节顺序学习。每章节都包含理论讲解和实践代码,请务必动手操作加深理解。

第一部分:爬虫基础与入门

1. 爬虫概念与环境搭建

1.1 什么是网络爬虫

网络爬虫(Web Spider)是一种自动化程序,用于从互联网上获取数据。它模拟浏览器的行为,访问网页并提取所需信息。

爬虫的应用场景:

1.2 Python爬虫环境搭建

首先,确保您的系统已安装Python 3.7+。然后安装必要的库:

# 安装基础爬虫库
pip install requests beautifulsoup4 lxml

# 安装selenium(用于处理JavaScript)
pip install selenium

# 安装scrapy框架
pip install scrapy

# 安装其他实用库
pip install pandas numpy matplotlib
💡 开发工具推荐:

2. 网络基础与HTTP协议

2.1 HTTP协议原理

HTTP(HyperText Transfer Protocol)是互联网上应用最广泛的网络协议。理解HTTP对爬虫至关重要。

HTTP方法 用途 常见应用
GET 请求数据 获取网页内容、图片等
POST 提交数据 登录、搜索、提交表单
PUT 更新数据 更新用户信息
DELETE 删除数据 删除资源

2.2 常见HTTP状态码

状态码 含义 处理方式
200 请求成功 正常处理响应
301 永久重定向 跟随重定向
403 禁止访问 检查权限或伪装请求头
404 页面未找到 处理错误或跳过
500 服务器内部错误 重试或等待

3. 第一个爬虫程序

3.1 使用urllib发送请求

Python内置的urllib库是最基础的HTTP客户端。

import urllib.request
import urllib.parse

# 简单的GET请求
def simple_get():
    try:
        url = "https://httpbin.org/html"
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        
        # 创建请求对象
        req = urllib.request.Request(url, headers=headers)
        
        # 发送请求并获取响应
        response = urllib.request.urlopen(req)
        
        # 读取响应内容
        content = response.read().decode('utf-8')
        
        print("状态码:", response.getcode())
        print("响应头:", response.headers)
        print("网页内容前500字符:")
        print(content[:500])
        
    except Exception as e:
        print(f"请求出错: {e}")

if __name__ == "__main__":
    simple_get()

3.2 URL编码与解码

import urllib.parse

# URL编码
def url_encoding():
    # 对中文和特殊字符进行编码
    base_url = "https://example.com/search?"
    params = {
        'q': 'Python爬虫教程',
        'page': 1,
        'category': '编程'
    }
    
    # 编码参数
    encoded_params = urllib.parse.urlencode(params)
    
    # 构造完整URL
    full_url = base_url + encoded_params
    print("完整URL:", full_url)
    
    # URL解码
    decoded = urllib.parse.unquote(full_url)
    print("解码后:", decoded)

url_encoding()
✅ 第一个爬虫完成了!现在您已经学会了如何发送HTTP请求并获取响应。接下来我们将学习更强大的requests库。

第二部分:核心技术与工具

4. requests库详解

requests库是最受欢迎的Python HTTP客户端库,API简洁直观,功能强大。

4.1 requests库安装与基本使用

import requests

# 基本GET请求
def basic_get():
    url = "https://jsonplaceholder.typicode.com/posts/1"
    
    response = requests.get(url)
    
    print(f"状态码: {response.status_code}")
    print(f"响应头: {response.headers}")
    print(f"响应内容: {response.text}")
    
    # JSON响应解析
    data = response.json()
    print(f"JSON数据: {data}")

basic_get()

4.2 各种请求方法演示

import requests

def http_methods_demo():
    base_url = "https://jsonplaceholder.typicode.com"
    
    # GET请求(获取数据)
    get_response = requests.get(f"{base_url}/posts/1")
    print("GET响应:", get_response.json())
    
    # POST请求(创建数据)
    post_data = {
        "title": "我的标题",
        "body": "我的内容",
        "userId": 1
    }
    post_response = requests.post(f"{base_url}/posts", json=post_data)
    print("POST响应:", post_response.json())
    
    # PUT请求(更新数据)
    put_data = {
        "id": 1,
        "title": "更新后的标题",
        "body": "更新后的内容",
        "userId": 1
    }
    put_response = requests.put(f"{base_url}/posts/1", json=put_data)
    print("PUT响应:", put_response.json())
    
    # DELETE请求(删除数据)
    delete_response = requests.delete(f"{base_url}/posts/1")
    print("DELETE状态码:", delete_response.status_code)

http_methods_demo()

4.3 请求头和参数设置

import requests

def advanced_requests():
    url = "https://httpbin.org/get"
    
    # 设置请求头
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'application/json',
        'Accept-Language': 'zh-CN,zh;q=0.9',
        'Referer': 'https://www.google.com/',
        'Connection': 'keep-alive'
    }
    
    # 设置查询参数
    params = {
        'name': '张三',
        'age': 25,
        'city': '北京'
    }
    
    # 发送请求
    response = requests.get(url, headers=headers, params=params)
    
    print(f"请求URL: {response.url}")
    print(f"响应状态: {response.status_code}")
    print("响应数据:")
    print(response.json())

advanced_requests()

4.4 会话管理与Cookie处理

import requests

def session_management():
    # 使用Session保持会话
    session = requests.Session()
    
    # 设置会话级别的头部
    session.headers.update({
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json'
    })
    
    # 模拟登录
    login_data = {
        'username': 'user123',
        'password': 'password123'
    }
    
    login_response = session.post('https://httpbin.org/post', data=login_data)
    print("登录响应:", login_response.json())
    
    # 访问需要登录的页面
    profile_response = session.get('https://httpbin.org/cookies')
    print("用户信息:", profile_response.json())
    
    # 设置Cookie
    session.cookies.set('my_cookie', 'test_value')
    cookie_response = session.get('https://httpbin.org/cookies')
    print("Cookie信息:", cookie_response.json())

session_management()

5. 数据提取技术

5.1 正则表达式基础

import re

def regex_examples():
    text = """
    联系方式:
    手机:138-1234-5678
    邮箱:[email protected]
    固话:010-8888-9999
    网站:https://www.example.com
    博客:http://blog.example.com
    """
    
    # 匹配手机号
    phone_pattern = r'1[3-9]\d{9}'
    phones = re.findall(phone_pattern, text)
    print("手机号:", phones)
    
    # 匹配邮箱
    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    emails = re.findall(email_pattern, text)
    print("邮箱:", emails)
    
    # 匹配网址
    url_pattern = r'https?://[^\s]+'
    urls = re.findall(url_pattern, text)
    print("网址:", urls)
    
    # 替换文本
    new_text = re.sub(r'\d{3}-\d{4}-\d{4}', '[电话号码]', text)
    print("替换后文本:")
    print(new_text)
    
    # 使用分组提取信息
    pattern = r'(\w+):([^\n]+)'
    matches = re.findall(pattern, text)
    for key, value in matches:
        print(f"{key}: {value}")

regex_examples()

5.2 BeautifulSoup库使用

from bs4 import BeautifulSoup
import requests

def bs4_demo():
    html = """
    
    
        示例页面
    
    
        

这是一个标题

这是一个介绍段落。

链接
""" # 解析HTML soup = BeautifulSoup(html, 'html.parser') # 获取标题 title = soup.title.text print("页面标题:", title) # 根据标签名查找 h1_text = soup.h1.text print("H1内容:", h1_text) # 根据类名查找 intro_paragraph = soup.find('p', class_='intro') print("介绍段落:", intro_paragraph.text if intro_paragraph else "未找到") # 根据ID查找 menu = soup.find('ul', id='menu') print("菜单内容:") if menu: for li in menu.find_all('li'): print(f" - {li.text}") # 获取属性 link = soup.find('a', id='link') if link: print("链接地址:", link.get('href')) print("链接文本:", link.text) # CSS选择器 items = soup.select('.item') # 按类选择 print("所有项目:", [item.text for item in items]) content = soup.select_one('.content') # 选择第一个匹配的元素 print("内容div:", content.get_text().strip()) bs4_demo()

5.3 实战:爬取新闻网站

import requests
from bs4 import BeautifulSoup
import json

def crawl_news():
    """
    爬取新闻网站示例
    注意:这只是技术演示,请遵守网站的使用条款
    """
    url = "https://news.ycombinator.com/"
    
    headers = {
        'User-Agent': 'Mozilla/5.0 (compatible; NewsBot/1.0)'
    }
    
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        
        soup = BeautifulSoup(response.text, 'html.parser')
        
        news_items = []
        
        # 查找新闻条目
        for i, item in enumerate(soup.find_all('tr', class_='athing')[:10]):  # 取前10条
            try:
                # 获取标题
                title_element = item.find('span', class_='titleline')
                if not title_element:
                    continue
                    
                title_link = title_element.find('a')
                if not title_link:
                    continue
                    
                title = title_link.get_text()
                link = title_link.get('href')
                
                # 获取得分
                score_element = item.find('span', class_='score')
                score = score_element.get_text() if score_element else "未知"
                
                # 获取评论数
                comments_element = item.find('td', class_='subtext')
                comments_text = ""
                if comments_element:
                    comments_link = comments_element.find('a', string=lambda text: text and 'comments' in text)
                    if comments_link:
                        comments_text = comments_link.get_text()
                
                news_item = {
                    '序号': i + 1,
                    '标题': title,
                    '链接': link,
                    '得分': score,
                    '评论': comments_text
                }
                
                news_items.append(news_item)
                print(f"{i+1}. {title} ({score})")
                
            except Exception as e:
                print(f"解析第{i}条新闻时出错: {e}")
                continue
        
        # 保存到JSON文件
        with open('news.json', 'w', encoding='utf-8') as f:
            json.dump(news_items, f, ensure_ascii=False, indent=2)
        
        print(f"\n成功爬取 {len(news_items)} 条新闻,已保存到 news.json")
        
    except requests.RequestException as e:
        print(f"请求失败: {e}")
    except Exception as e:
        print(f"程序运行出错: {e}")

# 运行爬虫
crawl_news()

6. 数据存储

6.1 文件存储

import json
import csv
import sqlite3
from datetime import datetime

def file_storage_demo():
    # 示例数据
    data = [
        {'name': '张三', 'age': 25, 'city': '北京'},
        {'name': '李四', 'age': 30, 'city': '上海'},
        {'name': '王五', 'age': 28, 'city': '广州'}
    ]
    
    # 1. JSON文件存储
    def save_to_json(data, filename):
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        print(f"数据已保存到 {filename}")
    
    # 2. CSV文件存储
    def save_to_csv(data, filename):
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=data[0].keys())
            writer.writeheader()
            writer.writerows(data)
        print(f"数据已保存到 {filename}")
    
    # 3. 文本文件存储
    def save_to_txt(data, filename):
        with open(filename, 'w', encoding='utf-8') as f:
            for item in data:
                f.write(f"姓名: {item['name']}, 年龄: {item['age']}, 城市: {item['city']}\n")
        print(f"数据已保存到 {filename}")
    
    # 保存数据
    save_to_json(data, 'data.json')
    save_to_csv(data, 'data.csv')
    save_to_txt(data, 'data.txt')

file_storage_demo()

6.2 SQLite数据库存储

import sqlite3

def sqlite_demo():
    # 连接数据库(不存在时会自动创建)
    conn = sqlite3.connect('crawler.db')
    cursor = conn.cursor()
    
    # 创建表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            age INTEGER,
            city TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    
    # 插入数据
    users_data = [
        ('张三', 25, '北京'),
        ('李四', 30, '上海'),
        ('王五', 28, '广州')
    ]
    
    cursor.executemany(
        'INSERT INTO users (name, age, city) VALUES (?, ?, ?)',
        users_data
    )
    
    # 查询数据
    cursor.execute('SELECT * FROM users')
    results = cursor.fetchall()
    print("数据库中的所有用户:")
    for row in results:
        print(row)
    
    # 统计查询
    cursor.execute('SELECT COUNT(*) FROM users')
    count = cursor.fetchone()[0]
    print(f"总用户数: {count}")
    
    # 提交并关闭连接
    conn.commit()
    conn.close()
    print("数据库操作完成")

sqlite_demo()

第三部分:进阶技术与反爬虫

7. 反爬虫技术应对

7.1 请求频率限制

import time
import random
from functools import wraps

def rate_limiter(min_delay=1, max_delay=3):
    """
    请求频率限制装饰器
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 随机延迟
            delay = random.uniform(min_delay, max_delay)
            print(f"等待 {delay:.2f} 秒...")
            time.sleep(delay)
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

class RateLimitedCrawler:
    def __init__(self, delay_range=(1, 3)):
        self.delay_range = delay_range
        self.request_count = 0
        self.start_time = time.time()
    
    def get_delay(self):
        """动态计算延迟时间"""
        if self.request_count > 50:
            # 请求数量超过50,增加延迟
            return random.uniform(3, 6)
        return random.uniform(*self.delay_range)
    
    def crawl_with_rate_limit(self, url):
        """带频率限制的爬取方法"""
        # 计算延迟
        delay = self.get_delay()
        print(f"请求 #{self.request_count + 1}: {url}")
        print(f"延迟时间: {delay:.2f} 秒")
        
        time.sleep(delay)
        
        # 模拟请求
        self.request_count += 1
        print(f"完成请求 #{self.request_count}")
        
        return f"模拟响应内容 #{self.request_count}"

# 使用示例
def demo_rate_limiting():
    crawler = RateLimitedCrawler()
    
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3",
        "https://example.com/page4",
        "https://example.com/page5"
    ]
    
    results = []
    for url in urls:
        result = crawler.crawl_with_rate_limit(url)
        results.append(result)
    
    print(f"\n总计爬取 {len(results)} 个页面")
    print(f"用时: {time.time() - crawler.start_time:.2f} 秒")

demo_rate_limiting()

7.2 User-Agent伪装

import requests
import random

# 模拟不同浏览器的User-Agent
USER_AGENTS = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
    'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
]

def get_random_headers():
    """获取随机请求头"""
    headers = {
        'User-Agent': random.choice(USER_AGENTS),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Accept-Encoding': 'gzip, deflate',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1'
    }
    return headers

def crawl_with_random_headers():
    """使用随机请求头的爬取示例"""
    url = "https://httpbin.org/headers"
    
    # 获取随机请求头
    headers = get_random_headers()
    
    print("当前User-Agent:", headers['User-Agent'])
    
    try:
        response = requests.get(url, headers=headers)
        
        if response.status_code == 200:
            data = response.json()
            print("服务器收到的请求头:")
            for key, value in data['headers'].items():
                print(f"  {key}: {value}")
        else:
            print(f"请求失败,状态码: {response.status_code}")
            
    except Exception as e:
        print(f"请求出错: {e}")

crawl_with_random_headers()

7.3 代理IP使用

import requests
import random

# 代理IP列表(示例,实际使用时请使用可靠的代理服务)
PROXY_LIST = [
    {'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'},
    {'http': 'http://127.0.0.1:8081', 'https': 'https://127.0.0.1:8081'},
    # 更多代理...
]

class ProxyCrawler:
    def __init__(self, proxies=None):
        self.proxies = proxies or PROXY_LIST
        self.session = requests.Session()
    
    def get_random_proxy(self):
        """随机获取一个代理"""
        return random.choice(self.proxies)
    
    def crawl_with_proxy(self, url):
        """使用代理爬取"""
        proxy = self.get_random_proxy()
        
        print(f"使用代理: {proxy}")
        
        try:
            response = self.session.get(
                url,
                proxies=proxy,
                timeout=10
            )
            
            print(f"状态码: {response.status_code}")
            print(f"响应长度: {len(response.text)} 字符")
            
            return response
            
        except requests.RequestException as e:
            print(f"代理请求失败: {e}")
            return None
    
    def crawl_multiple_urls_with_rotation(self, urls):
        """多URL轮换代理爬取"""
        results = []
        
        for i, url in enumerate(urls, 1):
            print(f"\n=== 爬取第 {i} 个URL ===")
            response = self.crawl_with_proxy(url)
            
            if response and response.status_code == 200:
                results.append({
                    'url': url,
                    'status': 'success',
                    'content_length': len(response.text)
                })
            else:
                results.append({
                    'url': url,
                    'status': 'failed',
                    'error': 'Request failed'
                })
        
        return results

def proxy_demo():
    """代理爬虫演示"""
    crawler = ProxyCrawler()
    
    urls = [
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
        "https://httpbin.org/headers"
    ]
    
    results = crawler.crawl_multiple_urls_with_rotation(urls)
    
    print("\n=== 爬取结果汇总 ===")
    for result in results:
        print(f"URL: {result['url']}")
        print(f"状态: {result['status']}")
        if result['status'] == 'success':
            print(f"内容长度: {result['content_length']}")
        print("-" * 50)

# 注意:实际使用时需要准备可靠的代理IP池
proxy_demo()

8. JavaScript渲染处理

8.1 静态页面vs动态页面

💡 关键概念:

8.2 Selenium自动化工具

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import time

def selenium_basic_demo():
    """Selenium基础使用演示"""
    
    # Chrome选项配置
    chrome_options = Options()
    chrome_options.add_argument('--headless')  # 无头模式
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')
    chrome_options.add_argument('--disable-gpu')
    chrome_options.add_argument('--window-size=1920,1080')
    
    # 启动浏览器
    driver = webdriver.Chrome(options=chrome_options)
    
    try:
        print("正在打开网页...")
        driver.get("https://httpbin.org/forms/post")
        
        # 等待页面加载
        wait = WebDriverWait(driver, 10)
        
        # 填写表单
        wait.until(EC.presence_of_element_located((By.NAME, "custname"))).send_keys("测试用户")
        driver.find_element(By.NAME, "custtel").send_keys("13800138000")
        driver.find_element(By.NAME, "custemail").send_keys("[email protected]")
        driver.find_element(By.NAME, "comments").send_keys("测试评论")
        
        # 提交表单
        submit_button = driver.find_element(By.CSS_SELECTOR, "input[type='submit']")
        submit_button.click()
        
        # 等待提交完成
        time.sleep(2)
        
        # 获取结果页面内容
        result_text = driver.page_source
        print("提交结果页面标题:", driver.title)
        print("页面内容前500字符:")
        print(result_text[:500])
        
    except Exception as e:
        print(f"执行过程中出错: {e}")
    
    finally:
        driver.quit()
        print("浏览器已关闭")

selenium_basic_demo()

8.3 处理Ajax异步加载

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import json

def handle_ajax_loading():
    """处理Ajax异步加载示例"""
    
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    
    driver = webdriver.Chrome(options=options)
    
    try:
        # 访问一个包含Ajax数据的页面(示例)
        driver.get("https://jsonplaceholder.typicode.com/")
        
        # 等待页面加载完成
        wait = WebDriverWait(driver, 10)
        
        # 模拟点击加载更多数据的按钮(如果有的话)
        try:
            # 等待特定元素出现
            load_more_button = wait.until(
                EC.element_to_be_clickable((By.CLASS_NAME, "load-more"))
            )
            load_more_button.click()
            
            # 等待新内容加载
            wait.until(
                EC.presence_of_element_located((By.CLASS_NAME, "new-content"))
            )
            
            print("新内容加载完成")
            
        except Exception as e:
            print("没有找到加载更多按钮或页面结构不同:", e)
        
        # 提取所有可见的文本数据
        page_text = driver.find_element(By.TAG_NAME, "body").text
        print("页面内容:", page_text[:200] + "...")
        
        # 获取页面JavaScript变量
        js_data = driver.execute_script("""
            // 如果页面中有JavaScript变量,直接获取
            if (typeof window.data !== 'undefined') {
                return window.data;
            }
            return null;
        """)
        
        if js_data:
            print("获取到的JavaScript数据:", js_data)
        
    except Exception as e:
        print(f"处理Ajax加载时出错: {e}")
    
    finally:
        driver.quit()

handle_ajax_loading()

第四部分:框架与分布式爬虫

10. Scrapy框架入门

10.1 Scrapy架构介绍

Scrapy架构组件:

10.2 创建Scrapy项目

# 创建Scrapy项目
scrapy startproject myproject

# 进入项目目录
cd myproject

# 创建爬虫
scrapy genspider example example.com

# 运行爬虫
scrapy crawl example

10.3 Spider开发示例

# myproject/spiders/quotes_spider.py
import scrapy

class QuotesSpider(scrapy.Spider):
    name = 'quotes'
    allowed_domains = ['quotes.toscrape.com']
    start_urls = ['http://quotes.toscrape.com/']
    
    def parse(self, response):
        # 提取名言信息
        quotes = response.css('div.quote')
        
        for quote in quotes:
            yield {
                'text': quote.css('span.text::text').get(),
                'author': quote.css('small.author::text').get(),
                'tags': quote.css('div.tags a.tag::text').getall()
            }
        
        # 提取下一页链接
        next_page = response.css('li.next a::attr(href)').get()
        if next_page:
            yield response.follow(next_page, callback=self.parse)
        
        # 如果有详情页面,爬取作者信息
        author_links = response.css('small.author + a::attr(href)').getall()
        for author_link in author_links:
            yield response.follow(author_link, self.parse_author)
    
    def parse_author(self, response):
        author = response.css('h3.author-title::text').get()
        birth_date = response.css('span.author-born-date::text').get()
        birth_place = response.css('span.author-born-location::text').get()
        description = response.css('div.author-description::text').get()
        
        yield {
            'name': author,
            'birth_date': birth_date,
            'birth_place': birth_place,
            'description': description.strip() if description else None
        }

10.4 Item Pipeline示例

# myproject/pipelines.py
import json
import sqlite3

class MyprojectPipeline:
    def __init__(self):
        self.file = open('quotes.json', 'w', encoding='utf-8')
        self.conn = sqlite3.connect('quotes.db')
        self.create_table()
    
    def create_table(self):
        cursor = self.conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS quotes (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                text TEXT,
                author TEXT,
                tags TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        self.conn.commit()
    
    def process_item(self, item, spider):
        # 保存到JSON文件
        line = json.dumps(dict(item), ensure_ascii=False) + '\n'
        self.file.write(line)
        
        # 保存到SQLite数据库
        cursor = self.conn.cursor()
        cursor.execute(
            'INSERT INTO quotes (text, author, tags) VALUES (?, ?, ?)',
            (
                item.get('text'),
                item.get('author'),
                ','.join(item.get('tags', []))
            )
        )
        self.conn.commit()
        
        return item
    
    def close_spider(self, spider):
        self.file.close()
        self.conn.close()

12. 分布式爬虫与部署

12.1 多线程爬虫

import threading
import queue
import requests
from bs4 import BeautifulSoup
import time

class MultiThreadedCrawler:
    def __init__(self, max_threads=5):
        self.max_threads = max_threads
        self.url_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.threads = []
        self.stop_event = threading.Event()
    
    def add_url(self, url):
        """添加URL到队列"""
        self.url_queue.put(url)
    
    def worker(self):
        """工作线程函数"""
        while not self.stop_event.is_set():
            try:
                url = self.url_queue.get(timeout=1)
                if url is None:
                    break
                
                # 爬取页面
                result = self.crawl_page(url)
                if result:
                    self.result_queue.put(result)
                
                self.url_queue.task_done()
                
            except queue.Empty:
                continue
            except Exception as e:
                print(f"爬取 {url} 时出错: {e}")
                self.url_queue.task_done()
    
    def crawl_page(self, url):
        """爬取单个页面"""
        try:
            headers = {
                'User-Agent': 'MultiThreadedCrawler/1.0'
            }
            
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 提取标题和链接
            title = soup.title.string if soup.title else "无标题"
            links = [a.get('href') for a in soup.find_all('a', href=True)]
            
            return {
                'url': url,
                'title': title,
                'link_count': len(links),
                'timestamp': time.time()
            }
            
        except Exception as e:
            return {'url': url, 'error': str(e)}
    
    def start_crawling(self, urls):
        """开始多线程爬取"""
        # 添加所有URL到队列
        for url in urls:
            self.add_url(url)
        
        # 创建并启动工作线程
        for i in range(self.max_threads):
            thread = threading.Thread(target=self.worker)
            thread.start()
            self.threads.append(thread)
        
        # 等待所有任务完成
        self.url_queue.join()
        
        # 停止所有线程
        self.stop_event.set()
        for thread in self.threads:
            thread.join()
        
        # 收集所有结果
        results = []
        while not self.result_queue.empty():
            results.append(self.result_queue.get())
        
        return results

def multi_thread_demo():
    """多线程爬虫演示"""
    urls = [
        "https://httpbin.org/html",
        "https://httpbin.org/json",
        "https://httpbin.org/xml",
        "https://httpbin.org/uuid"
    ]
    
    crawler = MultiThreadedCrawler(max_threads=3)
    
    print("开始多线程爬取...")
    start_time = time.time()
    
    results = crawler.start_crawling(urls)
    
    end_time = time.time()
    print(f"爬取完成,耗时: {end_time - start_time:.2f} 秒")
    print(f"共爬取 {len(results)} 个页面")
    
    for result in results:
        print(f"URL: {result['url']}")
        if 'title' in result:
            print(f"  标题: {result['title']}")
            print(f"  链接数: {result['link_count']}")
        else:
            print(f"  错误: {result.get('error', '未知错误')}")
        print("-" * 50)

multi_thread_demo()

12.2 分布式爬虫架构

# 分布式爬虫架构示例(简化版)
# 使用Redis作为任务队列

import redis
import json
import hashlib
import time
from datetime import datetime

class DistributedCrawler:
    def __init__(self, redis_host='localhost', redis_port=6379):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port)
        self.crawler_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:8]
    
    def add_crawling_task(self, url, priority=1):
        """添加爬取任务"""
        task = {
            'url': url,
            'priority': priority,
            'timestamp': time.time(),
            'crawler_id': self.crawler_id
        }
        
        # 使用优先队列,优先级越高越先处理
        self.redis_client.zadd('crawler_tasks', {json.dumps(task): -priority})
        print(f"爬虫 {self.crawler_id} 添加任务: {url}")
    
    def get_next_task(self):
        """获取下一个任务"""
        result = self.redis_client.zpopmax('crawler_tasks')
        if result:
            task_json, priority = result[0]
            task = json.loads(task_json.decode())
            return task
        return None
    
    def mark_task_complete(self, task, result):
        """标记任务完成"""
        completed_task = {
            'task': task,
            'result': result,
            'completed_at': time.time(),
            'completed_by': self.crawler_id
        }
        
        # 将结果存储到已完成任务集合
        self.redis_client.lpush('completed_tasks', json.dumps(completed_task))
        print(f"爬虫 {self.crawler_id} 完成任务: {task['url']}")
    
    def crawl_worker(self):
        """爬虫工作进程"""
        print(f"爬虫 {self.crawler_id} 开始工作...")
        
        while True:
            task = self.get_next_task()
            
            if not task:
                print(f"爬虫 {self.crawler_id} 等待任务...")
                time.sleep(1)
                continue
            
            # 执行爬取任务
            try:
                print(f"爬虫 {self.crawler_id} 正在处理: {task['url']}")
                
                # 这里应该是实际的爬取逻辑
                # result = self.perform_crawl(task['url'])
                result = {'status': 'success', 'data': 'sample_data'}
                
                # 标记任务完成
                self.mark_task_complete(task, result)
                
                # 模拟爬取延迟
                time.sleep(0.5)
                
            except Exception as e:
                error_result = {'status': 'error', 'error': str(e)}
                self.mark_task_complete(task, error_result)
    
    def get_crawler_stats(self):
        """获取爬虫统计信息"""
        total_tasks = self.redis_client.zcard('crawler_tasks')
        completed_count = self.redis_client.llen('completed_tasks')
        
        return {
            'pending_tasks': total_tasks,
            'completed_tasks': completed_count,
            'active_crawlers': len(self.redis_client.smembers('active_crawlers'))
        }

def distributed_crawler_demo():
    """分布式爬虫演示"""
    crawler = DistributedCrawler()
    
    # 添加测试任务
    test_urls = [
        "https://example.com/page1",
        "https://example.com/page2", 
        "https://example.com/page3",
        "https://example.com/page4",
        "https://example.com/page5"
    ]
    
    for i, url in enumerate(test_urls):
        priority = len(test_urls) - i  # 倒序优先级
        crawler.add_crawling_task(url, priority)
    
    print("分布式爬虫任务已添加")
    print("统计信息:", crawler.get_crawler_stats())

distributed_crawler_demo()

第五部分:实战项目与最佳实践

13. 实战项目案例

13.1 新闻网站爬虫完整项目

import requests
from bs4 import BeautifulSoup
import json
import csv
import sqlite3
from datetime import datetime
import time
import random
from urllib.parse import urljoin, urlparse
import logging

class NewsCrawler:
    def __init__(self, base_url):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'NewsCrawler/1.0'
        })
        
        # 设置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('crawler.log', encoding='utf-8'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
        # 初始化数据库
        self.init_database()
    
    def init_database(self):
        """初始化SQLite数据库"""
        self.conn = sqlite3.connect('news.db')
        cursor = self.conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS news (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT NOT NULL,
                content TEXT,
                author TEXT,
                publish_time TEXT,
                url TEXT UNIQUE,
                category TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        self.conn.commit()
    
    def crawl_news_list(self, page=1):
        """爬取新闻列表页面"""
        url = f"{self.base_url}/news/list?page={page}"
        
        try:
            self.logger.info(f"爬取新闻列表页面: {url}")
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.text, 'html.parser')
            
            news_items = []
            
            # 根据实际网站结构调整选择器
            news_elements = soup.find_all('div', class_='news-item')
            
            for element in news_elements:
                try:
                    title_element = element.find('h3', class_='title')
                    link_element = element.find('a', href=True)
                    time_element = element.find('span', class_='time')
                    author_element = element.find('span', class_='author')
                    
                    if title_element and link_element:
                        title = title_element.get_text().strip()
                        url = link_element['href']
                        
                        # 处理相对URL
                        if url.startswith('/'):
                            url = urljoin(self.base_url, url)
                        
                        publish_time = time_element.get_text().strip() if time_element else None
                        author = author_element.get_text().strip() if author_element else None
                        
                        news_item = {
                            'title': title,
                            'url': url,
                            'publish_time': publish_time,
                            'author': author
                        }
                        
                        news_items.append(news_item)
                        self.logger.info(f"提取新闻: {title}")
                
                except Exception as e:
                    self.logger.error(f"解析新闻项时出错: {e}")
                    continue
            
            return news_items
        
        except Exception as e:
            self.logger.error(f"爬取新闻列表失败: {e}")
            return []
    
    def crawl_news_detail(self, news_item):
        """爬取新闻详情"""
        url = news_item['url']
        
        try:
            self.logger.info(f"爬取新闻详情: {url}")
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 提取新闻内容
            content_element = soup.find('div', class_='content')
            content = content_element.get_text().strip() if content_element else ""
            
            # 提取发布日期
            date_element = soup.find('time') or soup.find('span', class_='date')
            publish_date = date_element.get_text().strip() if date_element else news_item.get('publish_time')
            
            # 提取分类
            category_element = soup.find('span', class_='category')
            category = category_element.get_text().strip() if category_element else None
            
            # 更新新闻信息
            news_item.update({
                'content': content,
                'publish_time': publish_date,
                'category': category
            })
            
            return news_item
        
        except Exception as e:
            self.logger.error(f"爬取新闻详情失败 {url}: {e}")
            return None
    
    def save_to_database(self, news_item):
        """保存新闻到数据库"""
        try:
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT OR REPLACE INTO news 
                (title, content, author, publish_time, url, category)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                news_item['title'],
                news_item.get('content', ''),
                news_item.get('author'),
                news_item.get('publish_time'),
                news_item['url'],
                news_item.get('category')
            ))
            self.conn.commit()
            return True
        except Exception as e:
            self.logger.error(f"保存新闻到数据库失败: {e}")
            return False
    
    def save_to_json(self, news_list, filename='news.json'):
        """保存新闻到JSON文件"""
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(news_list, f, ensure_ascii=False, indent=2)
            self.logger.info(f"新闻数据已保存到 {filename}")
        except Exception as e:
            self.logger.error(f"保存JSON文件失败: {e}")
    
    def crawl_with_rate_limit(self, max_pages=5):
        """带频率限制的爬取"""
        all_news = []
        
        for page in range(1, max_pages + 1):
            self.logger.info(f"开始爬取第 {page} 页")
            
            # 爬取新闻列表
            news_list = self.crawl_news_list(page)
            
            if not news_list:
                self.logger.info(f"第 {page} 页没有新闻,停止爬取")
                break
            
            # 爬取每个新闻的详情
            for news_item in news_list:
                detail = self.crawl_news_detail(news_item)
                
                if detail:
                    # 保存到数据库
                    if self.save_to_database(detail):
                        all_news.append(detail)
                        self.logger.info(f"成功爬取并保存: {detail['title']}")
                
                # 随机延迟
                delay = random.uniform(1, 3)
                self.logger.info(f"等待 {delay:.1f} 秒...")
                time.sleep(delay)
        
        # 保存到JSON文件
        if all_news:
            self.save_to_json(all_news)
        
        self.logger.info(f"爬取完成,共获取 {len(all_news)} 条新闻")
        return all_news
    
    def close(self):
        """关闭数据库连接"""
        if hasattr(self, 'conn'):
            self.conn.close()

def news_crawler_demo():
    """新闻爬虫演示"""
    crawler = NewsCrawler("https://example-news-site.com")
    
    try:
        news_list = crawler.crawl_with_rate_limit(max_pages=3)
        
        print(f"\n爬取结果汇总:")
        print(f"总共获取 {len(news_list)} 条新闻")
        
        # 显示前几条新闻
        for i, news in enumerate(news_list[:3], 1):
            print(f"\n新闻 {i}:")
            print(f"  标题: {news['title']}")
            print(f"  作者: {news.get('author', '未知')}")
            print(f"  时间: {news.get('publish_time', '未知')}")
            print(f"  链接: {news['url']}")
        
    except KeyboardInterrupt:
        print("\n爬取被用户中断")
    except Exception as e:
        print(f"爬取过程出错: {e}")
    finally:
        crawler.close()

# 注意:示例中的URL需要替换为实际的新闻网站
news_crawler_demo()

14. 法律与道德考量

⚠️ 重要提醒:

在使用爬虫技术时,必须遵守相关法律法规和网站的使用条款:

14.1 robots.txt协议

import urllib.robotparser
import requests

def check_robots_txt(url):
    """检查robots.txt文件"""
    from urllib.parse import urljoin, urlparse
    
    parsed_url = urlparse(url)
    base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
    robots_url = urljoin(base_url, '/robots.txt')
    
    try:
        rp = urllib.robotparser.RobotFileParser()
        rp.set_url(robots_url)
        rp.read()
        
        print(f"检查URL: {url}")
        print(f"robots.txt地址: {robots_url}")
        print(f"允许爬取: {rp.can_fetch('*', url)}")
        
        # 获取爬取延迟
        crawl_delay = rp.crawl_delay('*')
        if crawl_delay:
            print(f"建议爬取延迟: {crawl_delay} 秒")
        
        return rp.can_fetch('*', url), crawl_delay
        
    except Exception as e:
        print(f"检查robots.txt失败: {e}")
        return True, 1  # 默认允许爬取,延迟1秒

# 示例使用
check_robots_txt("https://example.com/page1")
check_robots_txt("https://httpbin.org/robots.txt")

14.2 爬虫道德准则

🔍 道德爬虫的最佳实践:

15. 性能优化与调试

15.1 爬虫性能优化

import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
import sqlite3
import json

class OptimizedCrawler:
    def __init__(self, max_workers=10):
        self.max_workers = max_workers
        self.session = requests.Session()
        
        # 启用连接池
        adapter = requests.adapters.HTTPAdapter(
            pool_connections=max_workers,
            pool_maxsize=max_workers,
            max_retries=3
        )
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)
        
        # 缓存配置
        self.cache = {}
        self.cache_timeout = 300  # 5分钟缓存
        
    @lru_cache(maxsize=100)
    def get_cached_content(self, url):
        """缓存页面内容"""
        cache_key = hash(url)
        
        if cache_key in self.cache:
            cached_data, timestamp = self.cache[cache_key]
            if time.time() - timestamp < self.cache_timeout:
                return cached_data
        
        return None
    
    def crawl_single_url(self, url):
        """单URL爬取(优化版)"""
        start_time = time.time()
        
        try:
            # 检查缓存
            cached = self.get_cached_content(url)
            if cached:
                return {
                    'url': url,
                    'status': 'cached',
                    'content_length': len(cached),
                    'time': time.time() - start_time
                }
            
            # 发送请求
            response = self.session.get(
                url,
                timeout=(5, 10),  # (连接超时, 读取超时)
                stream=True  # 流式下载,节省内存
            )
            response.raise_for_status()
            
            # 逐行处理内容
            content = []
            for line in response.iter_lines(decode_unicode=True):
                if line:
                    content.append(line)
            
            result = {
                'url': url,
                'status': 'success',
                'content': '\n'.join(content),
                'content_length': sum(len(line) for line in content),
                'time': time.time() - start_time,
                'headers': dict(response.headers)
            }
            
            # 缓存结果
            cache_key = hash(url)
            self.cache[cache_key] = (result['content'], time.time())
            
            return result
            
        except requests.RequestException as e:
            return {
                'url': url,
                'status': 'error',
                'error': str(e),
                'time': time.time() - start_time
            }
    
    def crawl_multiple_urls_optimized(self, urls):
        """优化的多URL并发爬取"""
        start_time = time.time()
        
        results = []
        
        # 使用线程池并发爬取
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 提交所有任务
            future_to_url = {
                executor.submit(self.crawl_single_url, url): url 
                for url in urls
            }
            
            # 收集结果
            for future in as_completed(future_to_url):
                try:
                    result = future.result()
                    results.append(result)
                    
                    # 统计信息
                    if result['status'] == 'success':
                        print(f"✓ {result['url']} - {result['content_length']} 字符 - {result['time']:.2f}s")
                    else:
                        print(f"✗ {result['url']} - {result['status']} - {result.get('error', 'Unknown error')}")
                
                except Exception as e:
                    url = future_to_url[future]
                    print(f"✗ {url} - Exception: {e}")
                    results.append({
                        'url': url,
                        'status': 'exception',
                        'error': str(e)
                    })
        
        total_time = time.time() - start_time
        successful = len([r for r in results if r['status'] in ['success', 'cached']])
        
        print(f"\n=== 爬取完成 ===")
        print(f"总URL数: {len(urls)}")
        print(f"成功: {successful}")
        print(f"失败: {len(urls) - successful}")
        print(f"总耗时: {total_time:.2f} 秒")
        print(f"平均每URL: {total_time/len(urls):.2f} 秒")
        print(f"并发数: {self.max_workers}")
        
        return results
    
    def analyze_performance(self, results):
        """性能分析"""
        successful_results = [r for r in results if r['status'] in ['success', 'cached']]
        
        if not successful_results:
            print("没有成功的爬取结果")
            return
        
        times = [r['time'] for r in successful_results]
        content_lengths = [r['content_length'] for r in successful_results]
        
        print(f"\n=== 性能分析 ===")
        print(f"平均响应时间: {sum(times)/len(times):.2f} 秒")
        print(f"最快响应: {min(times):.2f} 秒")
        print(f"最慢响应: {max(times):.2f} 秒")
        print(f"平均内容长度: {sum(content_lengths)/len(content_lengths):.0f} 字符")
        print(f"总数据量: {sum(content_lengths)/1024/1024:.2f} MB")

def optimized_crawler_demo():
    """优化爬虫演示"""
    crawler = OptimizedCrawler(max_workers=5)
    
    # 测试URL列表
    test_urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/json",
        "https://httpbin.org/html",
        "https://httpbin.org/xml",
        "https://httpbin.org/uuid",
        "https://httpbin.org/user-agent",
        "https://httpbin.org/headers"
    ]
    
    print(f"开始优化爬虫测试,并发数: {crawler.max_workers}")
    results = crawler.crawl_multiple_urls_optimized(test_urls)
    crawler.analyze_performance(results)

optimized_crawler_demo()

15.2 错误处理与日志

import logging
import traceback
from logging.handlers import RotatingFileHandler
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def setup_logging():
    """设置日志配置"""
    # 创建logger
    logger = logging.getLogger('crawler')
    logger.setLevel(logging.INFO)
    
    # 创建文件处理器 - 支持日志轮转
    file_handler = RotatingFileHandler(
        'crawler.log', 
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5,
        encoding='utf-8'
    )
    
    # 创建控制台处理器
    console_handler = logging.StreamHandler()
    
    # 设置格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
    )
    
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # 添加处理器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

class RobustCrawler:
    def __init__(self):
        self.logger = setup_logging()
        self.session = self.create_robust_session()
        
        # 错误统计
        self.error_stats = {
            'connection_errors': 0,
            'timeout_errors': 0,
            'http_errors': 0,
            'other_errors': 0
        }
    
    def create_robust_session(self):
        """创建健壮的requests会话"""
        session = requests.Session()
        
        # 配置重试策略
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,  # 指数退避: 1, 2, 4 秒
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "OPTIONS"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        
        # 设置默认头部
        session.headers.update({
            'User-Agent': 'RobustCrawler/2.0',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
        })
        
        return session
    
    def crawl_with_retry(self, url, max_retries=3, timeout=(5, 30)):
        """带重试机制的爬取"""
        for attempt in range(max_retries + 1):
            try:
                self.logger.info(f"尝试访问: {url} (第 {attempt + 1} 次)")
                
                response = self.session.get(url, timeout=timeout)
                response.raise_for_status()
                
                self.logger.info(f"成功访问: {url} - 状态码: {response.status_code}")
                return {
                    'url': url,
                    'status': 'success',
                    'content': response.text,
                    'status_code': response.status_code,
                    'headers': dict(response.headers),
                    'attempt': attempt + 1
                }
                
            except requests.exceptions.ConnectionError as e:
                self.error_stats['connection_errors'] += 1
                self.logger.warning(f"连接错误 (尝试 {attempt + 1}): {url} - {e}")
                
            except requests.exceptions.Timeout as e:
                self.error_stats['timeout_errors'] += 1
                self.logger.warning(f"超时错误 (尝试 {attempt + 1}): {url} - {e}")
                
            except requests.exceptions.HTTPError as e:
                self.error_stats['http_errors'] += 1
                self.logger.error(f"HTTP错误 (尝试 {attempt + 1}): {url} - {e}")
                
            except Exception as e:
                self.error_stats['other_errors'] += 1
                self.logger.error(f"其他错误 (尝试 {attempt + 1}): {url} - {e}")
                self.logger.debug(f"详细错误信息: {traceback.format_exc()}")
            
            # 如果不是最后一次尝试,等待后重试
            if attempt < max_retries:
                wait_time = 2 ** attempt  # 指数退避
                self.logger.info(f"等待 {wait_time} 秒后重试...")
                import time
                time.sleep(wait_time)
        
        # 所有重试都失败了
        self.logger.error(f"爬取失败,已达到最大重试次数: {url}")
        return {
            'url': url,
            'status': 'failed',
            'error': f'Max retries ({max_retries}) exceeded',
            'attempt': max_retries + 1
        }
    
    def batch_crawl(self, urls):
        """批量爬取(带错误处理)"""
        results = []
        success_count = 0
        error_count = 0
        
        for url in urls:
            try:
                result = self.crawl_with_retry(url)
                results.append(result)
                
                if result['status'] == 'success':
                    success_count += 1
                else:
                    error_count += 1
                    
            except Exception as e:
                self.logger.error(f"处理URL时发生未捕获异常: {url} - {e}")
                results.append({
                    'url': url,
                    'status': 'exception',
                    'error': str(e)
                })
                error_count += 1
        
        # 输出统计信息
        self.logger.info(f"批量爬取完成:")
        self.logger.info(f"  总数: {len(urls)}")
        self.logger.info(f"  成功: {success_count}")
        self.logger.info(f"  失败: {error_count}")
        self.logger.info(f"  错误统计: {self.error_stats}")
        
        return results
    
    def export_error_report(self, results, filename='error_report.json'):
        """导出错误报告"""
        errors = [r for r in results if r['status'] != 'success']
        
        error_report = {
            'summary': {
                'total_urls': len(results),
                'successful': len(results) - len(errors),
                'failed': len(errors),
                'error_statistics': self.error_stats
            },
            'errors': errors
        }
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(error_report, f, ensure_ascii=False, indent=2)
        
        self.logger.info(f"错误报告已导出到: {filename}")

def robust_crawler_demo():
    """健壮爬虫演示"""
    crawler = RobustCrawler()
    
    # 包含不同类型的测试URL
    test_urls = [
        "https://httpbin.org/status/200",  # 成功
        "https://httpbin.org/delay/2",     # 正常延迟
        "https://httpbin.org/status/404",  # 404错误
        "https://httpbin.org/status/500",  # 500错误
        "https://httpbin.org/timeout/1",   # 可能超时
        "https://jsonplaceholder.typicode.com/posts/1",  # 外部API
    ]
    
    print("开始健壮性测试...")
    results = crawler.batch_crawl(test_urls)
    
    print("\n=== 结果汇总 ===")
    for result in results:
        status_emoji = "✓" if result['status'] == 'success' else "✗"
        print(f"{status_emoji} {result['url']} - {result['status']}")
        if result['status'] != 'success':
            print(f"    错误: {result.get('error', 'N/A')}")
    
    # 导出错误报告
    crawler.export_error_report(results)

robust_crawler_demo()
🎉 恭喜!您已经完成了Python爬虫从入门到精通的学习旅程。本教程涵盖了:

请记住,优秀的爬虫工程师不仅要有技术能力,更要遵守法律法规和道德规范。持续练习,不断提升!

互动区域

登录后可以点赞此内容

参与互动

登录后可以点赞和评论此内容,与作者互动交流