这是一个标题
这是一个介绍段落。
链接网络爬虫(Web Spider)是一种自动化程序,用于从互联网上获取数据。它模拟浏览器的行为,访问网页并提取所需信息。
首先,确保您的系统已安装Python 3.7+。然后安装必要的库:
# 安装基础爬虫库
pip install requests beautifulsoup4 lxml
# 安装selenium(用于处理JavaScript)
pip install selenium
# 安装scrapy框架
pip install scrapy
# 安装其他实用库
pip install pandas numpy matplotlib
HTTP(HyperText Transfer Protocol)是互联网上应用最广泛的网络协议。理解HTTP对爬虫至关重要。
| HTTP方法 | 用途 | 常见应用 |
|---|---|---|
| GET | 请求数据 | 获取网页内容、图片等 |
| POST | 提交数据 | 登录、搜索、提交表单 |
| PUT | 更新数据 | 更新用户信息 |
| DELETE | 删除数据 | 删除资源 |
| 状态码 | 含义 | 处理方式 |
|---|---|---|
| 200 | 请求成功 | 正常处理响应 |
| 301 | 永久重定向 | 跟随重定向 |
| 403 | 禁止访问 | 检查权限或伪装请求头 |
| 404 | 页面未找到 | 处理错误或跳过 |
| 500 | 服务器内部错误 | 重试或等待 |
Python内置的urllib库是最基础的HTTP客户端。
import urllib.request
import urllib.parse
# 简单的GET请求
def simple_get():
try:
url = "https://httpbin.org/html"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
# 创建请求对象
req = urllib.request.Request(url, headers=headers)
# 发送请求并获取响应
response = urllib.request.urlopen(req)
# 读取响应内容
content = response.read().decode('utf-8')
print("状态码:", response.getcode())
print("响应头:", response.headers)
print("网页内容前500字符:")
print(content[:500])
except Exception as e:
print(f"请求出错: {e}")
if __name__ == "__main__":
simple_get()
import urllib.parse
# URL编码
def url_encoding():
# 对中文和特殊字符进行编码
base_url = "https://example.com/search?"
params = {
'q': 'Python爬虫教程',
'page': 1,
'category': '编程'
}
# 编码参数
encoded_params = urllib.parse.urlencode(params)
# 构造完整URL
full_url = base_url + encoded_params
print("完整URL:", full_url)
# URL解码
decoded = urllib.parse.unquote(full_url)
print("解码后:", decoded)
url_encoding()
requests库是最受欢迎的Python HTTP客户端库,API简洁直观,功能强大。
import requests
# 基本GET请求
def basic_get():
url = "https://jsonplaceholder.typicode.com/posts/1"
response = requests.get(url)
print(f"状态码: {response.status_code}")
print(f"响应头: {response.headers}")
print(f"响应内容: {response.text}")
# JSON响应解析
data = response.json()
print(f"JSON数据: {data}")
basic_get()
import requests
def http_methods_demo():
base_url = "https://jsonplaceholder.typicode.com"
# GET请求(获取数据)
get_response = requests.get(f"{base_url}/posts/1")
print("GET响应:", get_response.json())
# POST请求(创建数据)
post_data = {
"title": "我的标题",
"body": "我的内容",
"userId": 1
}
post_response = requests.post(f"{base_url}/posts", json=post_data)
print("POST响应:", post_response.json())
# PUT请求(更新数据)
put_data = {
"id": 1,
"title": "更新后的标题",
"body": "更新后的内容",
"userId": 1
}
put_response = requests.put(f"{base_url}/posts/1", json=put_data)
print("PUT响应:", put_response.json())
# DELETE请求(删除数据)
delete_response = requests.delete(f"{base_url}/posts/1")
print("DELETE状态码:", delete_response.status_code)
http_methods_demo()
import requests
def advanced_requests():
url = "https://httpbin.org/get"
# 设置请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Referer': 'https://www.google.com/',
'Connection': 'keep-alive'
}
# 设置查询参数
params = {
'name': '张三',
'age': 25,
'city': '北京'
}
# 发送请求
response = requests.get(url, headers=headers, params=params)
print(f"请求URL: {response.url}")
print(f"响应状态: {response.status_code}")
print("响应数据:")
print(response.json())
advanced_requests()
import requests
def session_management():
# 使用Session保持会话
session = requests.Session()
# 设置会话级别的头部
session.headers.update({
'User-Agent': 'MyApp/1.0',
'Accept': 'application/json'
})
# 模拟登录
login_data = {
'username': 'user123',
'password': 'password123'
}
login_response = session.post('https://httpbin.org/post', data=login_data)
print("登录响应:", login_response.json())
# 访问需要登录的页面
profile_response = session.get('https://httpbin.org/cookies')
print("用户信息:", profile_response.json())
# 设置Cookie
session.cookies.set('my_cookie', 'test_value')
cookie_response = session.get('https://httpbin.org/cookies')
print("Cookie信息:", cookie_response.json())
session_management()
import re
def regex_examples():
text = """
联系方式:
手机:138-1234-5678
邮箱:[email protected]
固话:010-8888-9999
网站:https://www.example.com
博客:http://blog.example.com
"""
# 匹配手机号
phone_pattern = r'1[3-9]\d{9}'
phones = re.findall(phone_pattern, text)
print("手机号:", phones)
# 匹配邮箱
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, text)
print("邮箱:", emails)
# 匹配网址
url_pattern = r'https?://[^\s]+'
urls = re.findall(url_pattern, text)
print("网址:", urls)
# 替换文本
new_text = re.sub(r'\d{3}-\d{4}-\d{4}', '[电话号码]', text)
print("替换后文本:")
print(new_text)
# 使用分组提取信息
pattern = r'(\w+):([^\n]+)'
matches = re.findall(pattern, text)
for key, value in matches:
print(f"{key}: {value}")
regex_examples()
from bs4 import BeautifulSoup
import requests
def bs4_demo():
html = """
示例页面
"""
# 解析HTML
soup = BeautifulSoup(html, 'html.parser')
# 获取标题
title = soup.title.text
print("页面标题:", title)
# 根据标签名查找
h1_text = soup.h1.text
print("H1内容:", h1_text)
# 根据类名查找
intro_paragraph = soup.find('p', class_='intro')
print("介绍段落:", intro_paragraph.text if intro_paragraph else "未找到")
# 根据ID查找
menu = soup.find('ul', id='menu')
print("菜单内容:")
if menu:
for li in menu.find_all('li'):
print(f" - {li.text}")
# 获取属性
link = soup.find('a', id='link')
if link:
print("链接地址:", link.get('href'))
print("链接文本:", link.text)
# CSS选择器
items = soup.select('.item') # 按类选择
print("所有项目:", [item.text for item in items])
content = soup.select_one('.content') # 选择第一个匹配的元素
print("内容div:", content.get_text().strip())
bs4_demo()
import requests
from bs4 import BeautifulSoup
import json
def crawl_news():
"""
爬取新闻网站示例
注意:这只是技术演示,请遵守网站的使用条款
"""
url = "https://news.ycombinator.com/"
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; NewsBot/1.0)'
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
news_items = []
# 查找新闻条目
for i, item in enumerate(soup.find_all('tr', class_='athing')[:10]): # 取前10条
try:
# 获取标题
title_element = item.find('span', class_='titleline')
if not title_element:
continue
title_link = title_element.find('a')
if not title_link:
continue
title = title_link.get_text()
link = title_link.get('href')
# 获取得分
score_element = item.find('span', class_='score')
score = score_element.get_text() if score_element else "未知"
# 获取评论数
comments_element = item.find('td', class_='subtext')
comments_text = ""
if comments_element:
comments_link = comments_element.find('a', string=lambda text: text and 'comments' in text)
if comments_link:
comments_text = comments_link.get_text()
news_item = {
'序号': i + 1,
'标题': title,
'链接': link,
'得分': score,
'评论': comments_text
}
news_items.append(news_item)
print(f"{i+1}. {title} ({score})")
except Exception as e:
print(f"解析第{i}条新闻时出错: {e}")
continue
# 保存到JSON文件
with open('news.json', 'w', encoding='utf-8') as f:
json.dump(news_items, f, ensure_ascii=False, indent=2)
print(f"\n成功爬取 {len(news_items)} 条新闻,已保存到 news.json")
except requests.RequestException as e:
print(f"请求失败: {e}")
except Exception as e:
print(f"程序运行出错: {e}")
# 运行爬虫
crawl_news()
import json
import csv
import sqlite3
from datetime import datetime
def file_storage_demo():
# 示例数据
data = [
{'name': '张三', 'age': 25, 'city': '北京'},
{'name': '李四', 'age': 30, 'city': '上海'},
{'name': '王五', 'age': 28, 'city': '广州'}
]
# 1. JSON文件存储
def save_to_json(data, filename):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"数据已保存到 {filename}")
# 2. CSV文件存储
def save_to_csv(data, filename):
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
print(f"数据已保存到 {filename}")
# 3. 文本文件存储
def save_to_txt(data, filename):
with open(filename, 'w', encoding='utf-8') as f:
for item in data:
f.write(f"姓名: {item['name']}, 年龄: {item['age']}, 城市: {item['city']}\n")
print(f"数据已保存到 {filename}")
# 保存数据
save_to_json(data, 'data.json')
save_to_csv(data, 'data.csv')
save_to_txt(data, 'data.txt')
file_storage_demo()
import sqlite3
def sqlite_demo():
# 连接数据库(不存在时会自动创建)
conn = sqlite3.connect('crawler.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
city TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 插入数据
users_data = [
('张三', 25, '北京'),
('李四', 30, '上海'),
('王五', 28, '广州')
]
cursor.executemany(
'INSERT INTO users (name, age, city) VALUES (?, ?, ?)',
users_data
)
# 查询数据
cursor.execute('SELECT * FROM users')
results = cursor.fetchall()
print("数据库中的所有用户:")
for row in results:
print(row)
# 统计查询
cursor.execute('SELECT COUNT(*) FROM users')
count = cursor.fetchone()[0]
print(f"总用户数: {count}")
# 提交并关闭连接
conn.commit()
conn.close()
print("数据库操作完成")
sqlite_demo()
import time
import random
from functools import wraps
def rate_limiter(min_delay=1, max_delay=3):
"""
请求频率限制装饰器
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 随机延迟
delay = random.uniform(min_delay, max_delay)
print(f"等待 {delay:.2f} 秒...")
time.sleep(delay)
return func(*args, **kwargs)
return wrapper
return decorator
class RateLimitedCrawler:
def __init__(self, delay_range=(1, 3)):
self.delay_range = delay_range
self.request_count = 0
self.start_time = time.time()
def get_delay(self):
"""动态计算延迟时间"""
if self.request_count > 50:
# 请求数量超过50,增加延迟
return random.uniform(3, 6)
return random.uniform(*self.delay_range)
def crawl_with_rate_limit(self, url):
"""带频率限制的爬取方法"""
# 计算延迟
delay = self.get_delay()
print(f"请求 #{self.request_count + 1}: {url}")
print(f"延迟时间: {delay:.2f} 秒")
time.sleep(delay)
# 模拟请求
self.request_count += 1
print(f"完成请求 #{self.request_count}")
return f"模拟响应内容 #{self.request_count}"
# 使用示例
def demo_rate_limiting():
crawler = RateLimitedCrawler()
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5"
]
results = []
for url in urls:
result = crawler.crawl_with_rate_limit(url)
results.append(result)
print(f"\n总计爬取 {len(results)} 个页面")
print(f"用时: {time.time() - crawler.start_time:.2f} 秒")
demo_rate_limiting()
import requests
import random
# 模拟不同浏览器的User-Agent
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
]
def get_random_headers():
"""获取随机请求头"""
headers = {
'User-Agent': random.choice(USER_AGENTS),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
return headers
def crawl_with_random_headers():
"""使用随机请求头的爬取示例"""
url = "https://httpbin.org/headers"
# 获取随机请求头
headers = get_random_headers()
print("当前User-Agent:", headers['User-Agent'])
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
print("服务器收到的请求头:")
for key, value in data['headers'].items():
print(f" {key}: {value}")
else:
print(f"请求失败,状态码: {response.status_code}")
except Exception as e:
print(f"请求出错: {e}")
crawl_with_random_headers()
import requests
import random
# 代理IP列表(示例,实际使用时请使用可靠的代理服务)
PROXY_LIST = [
{'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'},
{'http': 'http://127.0.0.1:8081', 'https': 'https://127.0.0.1:8081'},
# 更多代理...
]
class ProxyCrawler:
def __init__(self, proxies=None):
self.proxies = proxies or PROXY_LIST
self.session = requests.Session()
def get_random_proxy(self):
"""随机获取一个代理"""
return random.choice(self.proxies)
def crawl_with_proxy(self, url):
"""使用代理爬取"""
proxy = self.get_random_proxy()
print(f"使用代理: {proxy}")
try:
response = self.session.get(
url,
proxies=proxy,
timeout=10
)
print(f"状态码: {response.status_code}")
print(f"响应长度: {len(response.text)} 字符")
return response
except requests.RequestException as e:
print(f"代理请求失败: {e}")
return None
def crawl_multiple_urls_with_rotation(self, urls):
"""多URL轮换代理爬取"""
results = []
for i, url in enumerate(urls, 1):
print(f"\n=== 爬取第 {i} 个URL ===")
response = self.crawl_with_proxy(url)
if response and response.status_code == 200:
results.append({
'url': url,
'status': 'success',
'content_length': len(response.text)
})
else:
results.append({
'url': url,
'status': 'failed',
'error': 'Request failed'
})
return results
def proxy_demo():
"""代理爬虫演示"""
crawler = ProxyCrawler()
urls = [
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers"
]
results = crawler.crawl_multiple_urls_with_rotation(urls)
print("\n=== 爬取结果汇总 ===")
for result in results:
print(f"URL: {result['url']}")
print(f"状态: {result['status']}")
if result['status'] == 'success':
print(f"内容长度: {result['content_length']}")
print("-" * 50)
# 注意:实际使用时需要准备可靠的代理IP池
proxy_demo()
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import time
def selenium_basic_demo():
"""Selenium基础使用演示"""
# Chrome选项配置
chrome_options = Options()
chrome_options.add_argument('--headless') # 无头模式
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
# 启动浏览器
driver = webdriver.Chrome(options=chrome_options)
try:
print("正在打开网页...")
driver.get("https://httpbin.org/forms/post")
# 等待页面加载
wait = WebDriverWait(driver, 10)
# 填写表单
wait.until(EC.presence_of_element_located((By.NAME, "custname"))).send_keys("测试用户")
driver.find_element(By.NAME, "custtel").send_keys("13800138000")
driver.find_element(By.NAME, "custemail").send_keys("[email protected]")
driver.find_element(By.NAME, "comments").send_keys("测试评论")
# 提交表单
submit_button = driver.find_element(By.CSS_SELECTOR, "input[type='submit']")
submit_button.click()
# 等待提交完成
time.sleep(2)
# 获取结果页面内容
result_text = driver.page_source
print("提交结果页面标题:", driver.title)
print("页面内容前500字符:")
print(result_text[:500])
except Exception as e:
print(f"执行过程中出错: {e}")
finally:
driver.quit()
print("浏览器已关闭")
selenium_basic_demo()
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import json
def handle_ajax_loading():
"""处理Ajax异步加载示例"""
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
driver = webdriver.Chrome(options=options)
try:
# 访问一个包含Ajax数据的页面(示例)
driver.get("https://jsonplaceholder.typicode.com/")
# 等待页面加载完成
wait = WebDriverWait(driver, 10)
# 模拟点击加载更多数据的按钮(如果有的话)
try:
# 等待特定元素出现
load_more_button = wait.until(
EC.element_to_be_clickable((By.CLASS_NAME, "load-more"))
)
load_more_button.click()
# 等待新内容加载
wait.until(
EC.presence_of_element_located((By.CLASS_NAME, "new-content"))
)
print("新内容加载完成")
except Exception as e:
print("没有找到加载更多按钮或页面结构不同:", e)
# 提取所有可见的文本数据
page_text = driver.find_element(By.TAG_NAME, "body").text
print("页面内容:", page_text[:200] + "...")
# 获取页面JavaScript变量
js_data = driver.execute_script("""
// 如果页面中有JavaScript变量,直接获取
if (typeof window.data !== 'undefined') {
return window.data;
}
return null;
""")
if js_data:
print("获取到的JavaScript数据:", js_data)
except Exception as e:
print(f"处理Ajax加载时出错: {e}")
finally:
driver.quit()
handle_ajax_loading()
# 创建Scrapy项目
scrapy startproject myproject
# 进入项目目录
cd myproject
# 创建爬虫
scrapy genspider example example.com
# 运行爬虫
scrapy crawl example
# myproject/spiders/quotes_spider.py
import scrapy
class QuotesSpider(scrapy.Spider):
name = 'quotes'
allowed_domains = ['quotes.toscrape.com']
start_urls = ['http://quotes.toscrape.com/']
def parse(self, response):
# 提取名言信息
quotes = response.css('div.quote')
for quote in quotes:
yield {
'text': quote.css('span.text::text').get(),
'author': quote.css('small.author::text').get(),
'tags': quote.css('div.tags a.tag::text').getall()
}
# 提取下一页链接
next_page = response.css('li.next a::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
# 如果有详情页面,爬取作者信息
author_links = response.css('small.author + a::attr(href)').getall()
for author_link in author_links:
yield response.follow(author_link, self.parse_author)
def parse_author(self, response):
author = response.css('h3.author-title::text').get()
birth_date = response.css('span.author-born-date::text').get()
birth_place = response.css('span.author-born-location::text').get()
description = response.css('div.author-description::text').get()
yield {
'name': author,
'birth_date': birth_date,
'birth_place': birth_place,
'description': description.strip() if description else None
}
# myproject/pipelines.py
import json
import sqlite3
class MyprojectPipeline:
def __init__(self):
self.file = open('quotes.json', 'w', encoding='utf-8')
self.conn = sqlite3.connect('quotes.db')
self.create_table()
def create_table(self):
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS quotes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
text TEXT,
author TEXT,
tags TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def process_item(self, item, spider):
# 保存到JSON文件
line = json.dumps(dict(item), ensure_ascii=False) + '\n'
self.file.write(line)
# 保存到SQLite数据库
cursor = self.conn.cursor()
cursor.execute(
'INSERT INTO quotes (text, author, tags) VALUES (?, ?, ?)',
(
item.get('text'),
item.get('author'),
','.join(item.get('tags', []))
)
)
self.conn.commit()
return item
def close_spider(self, spider):
self.file.close()
self.conn.close()
import threading
import queue
import requests
from bs4 import BeautifulSoup
import time
class MultiThreadedCrawler:
def __init__(self, max_threads=5):
self.max_threads = max_threads
self.url_queue = queue.Queue()
self.result_queue = queue.Queue()
self.threads = []
self.stop_event = threading.Event()
def add_url(self, url):
"""添加URL到队列"""
self.url_queue.put(url)
def worker(self):
"""工作线程函数"""
while not self.stop_event.is_set():
try:
url = self.url_queue.get(timeout=1)
if url is None:
break
# 爬取页面
result = self.crawl_page(url)
if result:
self.result_queue.put(result)
self.url_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"爬取 {url} 时出错: {e}")
self.url_queue.task_done()
def crawl_page(self, url):
"""爬取单个页面"""
try:
headers = {
'User-Agent': 'MultiThreadedCrawler/1.0'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 提取标题和链接
title = soup.title.string if soup.title else "无标题"
links = [a.get('href') for a in soup.find_all('a', href=True)]
return {
'url': url,
'title': title,
'link_count': len(links),
'timestamp': time.time()
}
except Exception as e:
return {'url': url, 'error': str(e)}
def start_crawling(self, urls):
"""开始多线程爬取"""
# 添加所有URL到队列
for url in urls:
self.add_url(url)
# 创建并启动工作线程
for i in range(self.max_threads):
thread = threading.Thread(target=self.worker)
thread.start()
self.threads.append(thread)
# 等待所有任务完成
self.url_queue.join()
# 停止所有线程
self.stop_event.set()
for thread in self.threads:
thread.join()
# 收集所有结果
results = []
while not self.result_queue.empty():
results.append(self.result_queue.get())
return results
def multi_thread_demo():
"""多线程爬虫演示"""
urls = [
"https://httpbin.org/html",
"https://httpbin.org/json",
"https://httpbin.org/xml",
"https://httpbin.org/uuid"
]
crawler = MultiThreadedCrawler(max_threads=3)
print("开始多线程爬取...")
start_time = time.time()
results = crawler.start_crawling(urls)
end_time = time.time()
print(f"爬取完成,耗时: {end_time - start_time:.2f} 秒")
print(f"共爬取 {len(results)} 个页面")
for result in results:
print(f"URL: {result['url']}")
if 'title' in result:
print(f" 标题: {result['title']}")
print(f" 链接数: {result['link_count']}")
else:
print(f" 错误: {result.get('error', '未知错误')}")
print("-" * 50)
multi_thread_demo()
# 分布式爬虫架构示例(简化版)
# 使用Redis作为任务队列
import redis
import json
import hashlib
import time
from datetime import datetime
class DistributedCrawler:
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port)
self.crawler_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:8]
def add_crawling_task(self, url, priority=1):
"""添加爬取任务"""
task = {
'url': url,
'priority': priority,
'timestamp': time.time(),
'crawler_id': self.crawler_id
}
# 使用优先队列,优先级越高越先处理
self.redis_client.zadd('crawler_tasks', {json.dumps(task): -priority})
print(f"爬虫 {self.crawler_id} 添加任务: {url}")
def get_next_task(self):
"""获取下一个任务"""
result = self.redis_client.zpopmax('crawler_tasks')
if result:
task_json, priority = result[0]
task = json.loads(task_json.decode())
return task
return None
def mark_task_complete(self, task, result):
"""标记任务完成"""
completed_task = {
'task': task,
'result': result,
'completed_at': time.time(),
'completed_by': self.crawler_id
}
# 将结果存储到已完成任务集合
self.redis_client.lpush('completed_tasks', json.dumps(completed_task))
print(f"爬虫 {self.crawler_id} 完成任务: {task['url']}")
def crawl_worker(self):
"""爬虫工作进程"""
print(f"爬虫 {self.crawler_id} 开始工作...")
while True:
task = self.get_next_task()
if not task:
print(f"爬虫 {self.crawler_id} 等待任务...")
time.sleep(1)
continue
# 执行爬取任务
try:
print(f"爬虫 {self.crawler_id} 正在处理: {task['url']}")
# 这里应该是实际的爬取逻辑
# result = self.perform_crawl(task['url'])
result = {'status': 'success', 'data': 'sample_data'}
# 标记任务完成
self.mark_task_complete(task, result)
# 模拟爬取延迟
time.sleep(0.5)
except Exception as e:
error_result = {'status': 'error', 'error': str(e)}
self.mark_task_complete(task, error_result)
def get_crawler_stats(self):
"""获取爬虫统计信息"""
total_tasks = self.redis_client.zcard('crawler_tasks')
completed_count = self.redis_client.llen('completed_tasks')
return {
'pending_tasks': total_tasks,
'completed_tasks': completed_count,
'active_crawlers': len(self.redis_client.smembers('active_crawlers'))
}
def distributed_crawler_demo():
"""分布式爬虫演示"""
crawler = DistributedCrawler()
# 添加测试任务
test_urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5"
]
for i, url in enumerate(test_urls):
priority = len(test_urls) - i # 倒序优先级
crawler.add_crawling_task(url, priority)
print("分布式爬虫任务已添加")
print("统计信息:", crawler.get_crawler_stats())
distributed_crawler_demo()
import requests
from bs4 import BeautifulSoup
import json
import csv
import sqlite3
from datetime import datetime
import time
import random
from urllib.parse import urljoin, urlparse
import logging
class NewsCrawler:
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'NewsCrawler/1.0'
})
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('crawler.log', encoding='utf-8'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# 初始化数据库
self.init_database()
def init_database(self):
"""初始化SQLite数据库"""
self.conn = sqlite3.connect('news.db')
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT,
author TEXT,
publish_time TEXT,
url TEXT UNIQUE,
category TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def crawl_news_list(self, page=1):
"""爬取新闻列表页面"""
url = f"{self.base_url}/news/list?page={page}"
try:
self.logger.info(f"爬取新闻列表页面: {url}")
response = self.session.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
news_items = []
# 根据实际网站结构调整选择器
news_elements = soup.find_all('div', class_='news-item')
for element in news_elements:
try:
title_element = element.find('h3', class_='title')
link_element = element.find('a', href=True)
time_element = element.find('span', class_='time')
author_element = element.find('span', class_='author')
if title_element and link_element:
title = title_element.get_text().strip()
url = link_element['href']
# 处理相对URL
if url.startswith('/'):
url = urljoin(self.base_url, url)
publish_time = time_element.get_text().strip() if time_element else None
author = author_element.get_text().strip() if author_element else None
news_item = {
'title': title,
'url': url,
'publish_time': publish_time,
'author': author
}
news_items.append(news_item)
self.logger.info(f"提取新闻: {title}")
except Exception as e:
self.logger.error(f"解析新闻项时出错: {e}")
continue
return news_items
except Exception as e:
self.logger.error(f"爬取新闻列表失败: {e}")
return []
def crawl_news_detail(self, news_item):
"""爬取新闻详情"""
url = news_item['url']
try:
self.logger.info(f"爬取新闻详情: {url}")
response = self.session.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 提取新闻内容
content_element = soup.find('div', class_='content')
content = content_element.get_text().strip() if content_element else ""
# 提取发布日期
date_element = soup.find('time') or soup.find('span', class_='date')
publish_date = date_element.get_text().strip() if date_element else news_item.get('publish_time')
# 提取分类
category_element = soup.find('span', class_='category')
category = category_element.get_text().strip() if category_element else None
# 更新新闻信息
news_item.update({
'content': content,
'publish_time': publish_date,
'category': category
})
return news_item
except Exception as e:
self.logger.error(f"爬取新闻详情失败 {url}: {e}")
return None
def save_to_database(self, news_item):
"""保存新闻到数据库"""
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO news
(title, content, author, publish_time, url, category)
VALUES (?, ?, ?, ?, ?, ?)
''', (
news_item['title'],
news_item.get('content', ''),
news_item.get('author'),
news_item.get('publish_time'),
news_item['url'],
news_item.get('category')
))
self.conn.commit()
return True
except Exception as e:
self.logger.error(f"保存新闻到数据库失败: {e}")
return False
def save_to_json(self, news_list, filename='news.json'):
"""保存新闻到JSON文件"""
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(news_list, f, ensure_ascii=False, indent=2)
self.logger.info(f"新闻数据已保存到 {filename}")
except Exception as e:
self.logger.error(f"保存JSON文件失败: {e}")
def crawl_with_rate_limit(self, max_pages=5):
"""带频率限制的爬取"""
all_news = []
for page in range(1, max_pages + 1):
self.logger.info(f"开始爬取第 {page} 页")
# 爬取新闻列表
news_list = self.crawl_news_list(page)
if not news_list:
self.logger.info(f"第 {page} 页没有新闻,停止爬取")
break
# 爬取每个新闻的详情
for news_item in news_list:
detail = self.crawl_news_detail(news_item)
if detail:
# 保存到数据库
if self.save_to_database(detail):
all_news.append(detail)
self.logger.info(f"成功爬取并保存: {detail['title']}")
# 随机延迟
delay = random.uniform(1, 3)
self.logger.info(f"等待 {delay:.1f} 秒...")
time.sleep(delay)
# 保存到JSON文件
if all_news:
self.save_to_json(all_news)
self.logger.info(f"爬取完成,共获取 {len(all_news)} 条新闻")
return all_news
def close(self):
"""关闭数据库连接"""
if hasattr(self, 'conn'):
self.conn.close()
def news_crawler_demo():
"""新闻爬虫演示"""
crawler = NewsCrawler("https://example-news-site.com")
try:
news_list = crawler.crawl_with_rate_limit(max_pages=3)
print(f"\n爬取结果汇总:")
print(f"总共获取 {len(news_list)} 条新闻")
# 显示前几条新闻
for i, news in enumerate(news_list[:3], 1):
print(f"\n新闻 {i}:")
print(f" 标题: {news['title']}")
print(f" 作者: {news.get('author', '未知')}")
print(f" 时间: {news.get('publish_time', '未知')}")
print(f" 链接: {news['url']}")
except KeyboardInterrupt:
print("\n爬取被用户中断")
except Exception as e:
print(f"爬取过程出错: {e}")
finally:
crawler.close()
# 注意:示例中的URL需要替换为实际的新闻网站
news_crawler_demo()
在使用爬虫技术时,必须遵守相关法律法规和网站的使用条款:
import urllib.robotparser
import requests
def check_robots_txt(url):
"""检查robots.txt文件"""
from urllib.parse import urljoin, urlparse
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
robots_url = urljoin(base_url, '/robots.txt')
try:
rp = urllib.robotparser.RobotFileParser()
rp.set_url(robots_url)
rp.read()
print(f"检查URL: {url}")
print(f"robots.txt地址: {robots_url}")
print(f"允许爬取: {rp.can_fetch('*', url)}")
# 获取爬取延迟
crawl_delay = rp.crawl_delay('*')
if crawl_delay:
print(f"建议爬取延迟: {crawl_delay} 秒")
return rp.can_fetch('*', url), crawl_delay
except Exception as e:
print(f"检查robots.txt失败: {e}")
return True, 1 # 默认允许爬取,延迟1秒
# 示例使用
check_robots_txt("https://example.com/page1")
check_robots_txt("https://httpbin.org/robots.txt")
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
import sqlite3
import json
class OptimizedCrawler:
def __init__(self, max_workers=10):
self.max_workers = max_workers
self.session = requests.Session()
# 启用连接池
adapter = requests.adapters.HTTPAdapter(
pool_connections=max_workers,
pool_maxsize=max_workers,
max_retries=3
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# 缓存配置
self.cache = {}
self.cache_timeout = 300 # 5分钟缓存
@lru_cache(maxsize=100)
def get_cached_content(self, url):
"""缓存页面内容"""
cache_key = hash(url)
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.cache_timeout:
return cached_data
return None
def crawl_single_url(self, url):
"""单URL爬取(优化版)"""
start_time = time.time()
try:
# 检查缓存
cached = self.get_cached_content(url)
if cached:
return {
'url': url,
'status': 'cached',
'content_length': len(cached),
'time': time.time() - start_time
}
# 发送请求
response = self.session.get(
url,
timeout=(5, 10), # (连接超时, 读取超时)
stream=True # 流式下载,节省内存
)
response.raise_for_status()
# 逐行处理内容
content = []
for line in response.iter_lines(decode_unicode=True):
if line:
content.append(line)
result = {
'url': url,
'status': 'success',
'content': '\n'.join(content),
'content_length': sum(len(line) for line in content),
'time': time.time() - start_time,
'headers': dict(response.headers)
}
# 缓存结果
cache_key = hash(url)
self.cache[cache_key] = (result['content'], time.time())
return result
except requests.RequestException as e:
return {
'url': url,
'status': 'error',
'error': str(e),
'time': time.time() - start_time
}
def crawl_multiple_urls_optimized(self, urls):
"""优化的多URL并发爬取"""
start_time = time.time()
results = []
# 使用线程池并发爬取
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_url = {
executor.submit(self.crawl_single_url, url): url
for url in urls
}
# 收集结果
for future in as_completed(future_to_url):
try:
result = future.result()
results.append(result)
# 统计信息
if result['status'] == 'success':
print(f"✓ {result['url']} - {result['content_length']} 字符 - {result['time']:.2f}s")
else:
print(f"✗ {result['url']} - {result['status']} - {result.get('error', 'Unknown error')}")
except Exception as e:
url = future_to_url[future]
print(f"✗ {url} - Exception: {e}")
results.append({
'url': url,
'status': 'exception',
'error': str(e)
})
total_time = time.time() - start_time
successful = len([r for r in results if r['status'] in ['success', 'cached']])
print(f"\n=== 爬取完成 ===")
print(f"总URL数: {len(urls)}")
print(f"成功: {successful}")
print(f"失败: {len(urls) - successful}")
print(f"总耗时: {total_time:.2f} 秒")
print(f"平均每URL: {total_time/len(urls):.2f} 秒")
print(f"并发数: {self.max_workers}")
return results
def analyze_performance(self, results):
"""性能分析"""
successful_results = [r for r in results if r['status'] in ['success', 'cached']]
if not successful_results:
print("没有成功的爬取结果")
return
times = [r['time'] for r in successful_results]
content_lengths = [r['content_length'] for r in successful_results]
print(f"\n=== 性能分析 ===")
print(f"平均响应时间: {sum(times)/len(times):.2f} 秒")
print(f"最快响应: {min(times):.2f} 秒")
print(f"最慢响应: {max(times):.2f} 秒")
print(f"平均内容长度: {sum(content_lengths)/len(content_lengths):.0f} 字符")
print(f"总数据量: {sum(content_lengths)/1024/1024:.2f} MB")
def optimized_crawler_demo():
"""优化爬虫演示"""
crawler = OptimizedCrawler(max_workers=5)
# 测试URL列表
test_urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/json",
"https://httpbin.org/html",
"https://httpbin.org/xml",
"https://httpbin.org/uuid",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers"
]
print(f"开始优化爬虫测试,并发数: {crawler.max_workers}")
results = crawler.crawl_multiple_urls_optimized(test_urls)
crawler.analyze_performance(results)
optimized_crawler_demo()
import logging
import traceback
from logging.handlers import RotatingFileHandler
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def setup_logging():
"""设置日志配置"""
# 创建logger
logger = logging.getLogger('crawler')
logger.setLevel(logging.INFO)
# 创建文件处理器 - 支持日志轮转
file_handler = RotatingFileHandler(
'crawler.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
# 创建控制台处理器
console_handler = logging.StreamHandler()
# 设置格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
class RobustCrawler:
def __init__(self):
self.logger = setup_logging()
self.session = self.create_robust_session()
# 错误统计
self.error_stats = {
'connection_errors': 0,
'timeout_errors': 0,
'http_errors': 0,
'other_errors': 0
}
def create_robust_session(self):
"""创建健壮的requests会话"""
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=1, # 指数退避: 1, 2, 4 秒
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 设置默认头部
session.headers.update({
'User-Agent': 'RobustCrawler/2.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
})
return session
def crawl_with_retry(self, url, max_retries=3, timeout=(5, 30)):
"""带重试机制的爬取"""
for attempt in range(max_retries + 1):
try:
self.logger.info(f"尝试访问: {url} (第 {attempt + 1} 次)")
response = self.session.get(url, timeout=timeout)
response.raise_for_status()
self.logger.info(f"成功访问: {url} - 状态码: {response.status_code}")
return {
'url': url,
'status': 'success',
'content': response.text,
'status_code': response.status_code,
'headers': dict(response.headers),
'attempt': attempt + 1
}
except requests.exceptions.ConnectionError as e:
self.error_stats['connection_errors'] += 1
self.logger.warning(f"连接错误 (尝试 {attempt + 1}): {url} - {e}")
except requests.exceptions.Timeout as e:
self.error_stats['timeout_errors'] += 1
self.logger.warning(f"超时错误 (尝试 {attempt + 1}): {url} - {e}")
except requests.exceptions.HTTPError as e:
self.error_stats['http_errors'] += 1
self.logger.error(f"HTTP错误 (尝试 {attempt + 1}): {url} - {e}")
except Exception as e:
self.error_stats['other_errors'] += 1
self.logger.error(f"其他错误 (尝试 {attempt + 1}): {url} - {e}")
self.logger.debug(f"详细错误信息: {traceback.format_exc()}")
# 如果不是最后一次尝试,等待后重试
if attempt < max_retries:
wait_time = 2 ** attempt # 指数退避
self.logger.info(f"等待 {wait_time} 秒后重试...")
import time
time.sleep(wait_time)
# 所有重试都失败了
self.logger.error(f"爬取失败,已达到最大重试次数: {url}")
return {
'url': url,
'status': 'failed',
'error': f'Max retries ({max_retries}) exceeded',
'attempt': max_retries + 1
}
def batch_crawl(self, urls):
"""批量爬取(带错误处理)"""
results = []
success_count = 0
error_count = 0
for url in urls:
try:
result = self.crawl_with_retry(url)
results.append(result)
if result['status'] == 'success':
success_count += 1
else:
error_count += 1
except Exception as e:
self.logger.error(f"处理URL时发生未捕获异常: {url} - {e}")
results.append({
'url': url,
'status': 'exception',
'error': str(e)
})
error_count += 1
# 输出统计信息
self.logger.info(f"批量爬取完成:")
self.logger.info(f" 总数: {len(urls)}")
self.logger.info(f" 成功: {success_count}")
self.logger.info(f" 失败: {error_count}")
self.logger.info(f" 错误统计: {self.error_stats}")
return results
def export_error_report(self, results, filename='error_report.json'):
"""导出错误报告"""
errors = [r for r in results if r['status'] != 'success']
error_report = {
'summary': {
'total_urls': len(results),
'successful': len(results) - len(errors),
'failed': len(errors),
'error_statistics': self.error_stats
},
'errors': errors
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(error_report, f, ensure_ascii=False, indent=2)
self.logger.info(f"错误报告已导出到: {filename}")
def robust_crawler_demo():
"""健壮爬虫演示"""
crawler = RobustCrawler()
# 包含不同类型的测试URL
test_urls = [
"https://httpbin.org/status/200", # 成功
"https://httpbin.org/delay/2", # 正常延迟
"https://httpbin.org/status/404", # 404错误
"https://httpbin.org/status/500", # 500错误
"https://httpbin.org/timeout/1", # 可能超时
"https://jsonplaceholder.typicode.com/posts/1", # 外部API
]
print("开始健壮性测试...")
results = crawler.batch_crawl(test_urls)
print("\n=== 结果汇总 ===")
for result in results:
status_emoji = "✓" if result['status'] == 'success' else "✗"
print(f"{status_emoji} {result['url']} - {result['status']}")
if result['status'] != 'success':
print(f" 错误: {result.get('error', 'N/A')}")
# 导出错误报告
crawler.export_error_report(results)
robust_crawler_demo()
请记住,优秀的爬虫工程师不仅要有技术能力,更要遵守法律法规和道德规范。持续练习,不断提升!