以下是支持多页面爬取、反爬机制和 CSV 导出的核心代码,以爬取电商商品数据为例:
import requests
from bs4 import BeautifulSoup
import csv
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import json
# 配置日志
logging.basicConfig(
filename='scraper_log.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
encoding='utf-8'
)
# 反爬配置 - 随机 User-Agent
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/18.18363'
]# 读取爬取配置
def load_config(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
# 获取页面响应(带反爬机制)
def get_page_response(url, proxies=None):
headers = {
'User-Agent': random.choice(USER_AGENTS),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Referer': url
}
try:
response = requests.get(
url,
headers=headers,
proxies=proxies,
timeout=10,
allow_redirects=False
)
response.raise_for_status() # 抛出 HTTP 错误
response.encoding = response.apparent_encoding
time.sleep(random.uniform(0.5, 2)) # 随机延迟
return response.text
except Exception as e:
logging.error(f"获取页面失败 {url}:{str(e)}")
return None
# 解析商品数据
def parse_product_data(html, config):
soup = BeautifulSoup(html, 'html.parser')
products = []
# 查找所有商品卡片
product_cards = soup.select(config['product_card_selector'])
for card in product_cards:
product = {}
# 按配置提取字段
for field, selector in config['fields'].items():
element = card.select_one(selector)
product[field] = element.get_text(strip=True) if element else ''
# 提取商品链接
link_element = card.select_one(config['link_selector'])
product['link'] = link_element.get('href') if link_element else ''
if product[config['primary_key']]: # 过滤空数据
products.append(product)
return products
# 保存数据到 CSV
def save_to_csv(data, save_path):
if not data:
logging.warning("无数据可保存")
return
with open(save_path, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
logging.info(f"数据已保存至:{save_path}")
# 单页面爬取
def scrape_single_page(url, config, proxies=None):
html = get_page_response(url, proxies)
if not html:
return []
return parse_product_data(html, config)
# 多页面批量爬取(多线程)
def scrape_batch_pages(base_url, page_count, config, proxies=None, max_workers=5):
all_products = []
urls = [base_url.format(page=i) for i in range(1, page_count + 1)]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_url = {
executor.submit(scrape_single_page, url, config, proxies): url for url in urls
}
# 处理结果
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
products = future.result()
all_products.extend(products)
logging.info(f"成功爬取 {url},获取 {len(products)} 条数据")
except Exception as e:
logging.error(f"爬取 {url} 异常:{str(e)}")
# 数据去重
unique_products = list({p[config['primary_key']]: p for p in all_products}.values())
logging.info(f"去重后共获取 {len(unique_products)} 条数据")
return unique_products
# 主函数调用
if __name__ == '__main__':
# 加载配置文件(包含选择器等信息)
config = load_config('scraper_config.json')
# 爬取配置
base_url = config['base_url'] # 示例:'https://example.com/products?page={}'
page_count = config['page_count'] # 爬取页数
save_path = config['save_path'] # 输出路径:'output/products.csv'
proxies = config.get('proxies') # 代理配置(可选)
# 执行爬取
products = scrape_batch_pages(base_url, page_count, config, proxies)
# 保存数据
save_to_csv(products, save_path)