(原创)Python图片爬虫:绕过防盗的巧妙解决方案
1. 程序概述
介绍这个基于Python的图片爬虫工具,它是一个用于学习和测试的代码,旨在帮助用户从指定网站下载图片。该爬虫具有以下特点:
使用Selenium和BeautifulSoup进行网页解析和内容提取。
利用多线程提高图片下载效率。
实现了图片的去重,避免下载相同的图片。
支持设置爬取深度,防止无限递归。
2. 程序结构
该程序主要由以下几个部分组成:
ImageCrawler类: 爬虫的核心类,负责启动爬取和处理每个页面的信息。
crawl方法: 递归地爬取页面,提取图片链接和页面链接。
download_image方法: 下载图片并保存到本地,实现了图片的去重。
is_large_enough方法: 判断图片是否足够大,避免下载过小的图片。
update_stats方法: 更新爬取统计信息,显示已爬取的图片数量。
3. 程序运行
程序通过Chrome浏览器驱动实现页面加载,支持设置超时时间。使用连接池发送请求,携带请求头信息,确保下载的是目标图片。
4. 日志记录
引入日志记录,记录页面加载超时、下载错误等异常情况,方便用户追踪和解决问题。
5. 优势和不足
优势:
简单易用:用户只需实例化ImageCrawler类并调用start_crawling方法即可开始爬取。
强大的功能:支持多线程、页面深度控制、图片大小过滤等功能,满足不同需求。
可扩展性:用户可以根据实际需求轻松扩展功能或进行定制。
不足:
依赖性:需要安装Selenium、BeautifulSoup等第三方库,并下载对应浏览器的驱动。
可配置性:虽然提供了一些配置选项,但仍有一些硬编码的地方,不够灵活。
6. 使用示例
用户只需在主程序中实例化ImageCrawler类,并调用start_crawling方法即可开始爬取图片。具体使用方法可参考主程序末尾的if __name__ == "__main__":
部分。
7. 总结
该图片爬虫工具是一个简单而强大的学习和测试工具,提供了基本的图片爬取功能,同时具备一些高级特性。用户可以根据实际需求灵活调整配置,快速实现定制化的图片爬取任务。
代码
如果你喜欢使用GUI,请使用这个代码,否则请往下翻。
import logging import threading import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from bs4 import BeautifulSoup import hashlib import os import re from PIL import Image, ImageTk from io import BytesIO from urllib.parse import urlparse import tkinter as tk from tkinter import ttk import time from datetime import timedelta class ImageCrawlerGUI: def __init__(self): self.root = tk.Tk() self.root.title("Image Crawler GUI") # Variables for user inputs self.base_url_var = tk.StringVar() self.base_url_var.set("https://abc.abc") self.image_directory_var = tk.StringVar() self.image_directory_var.set("downloaded_images") self.max_depth_var = tk.IntVar() self.max_depth_var.set(5) self.page_threads_var = tk.IntVar() self.page_threads_var.set(3) self.image_threads_var = tk.IntVar() self.image_threads_var.set(3) # Labels and Entry for user inputs tk.Label(self.root, text="Base URL:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=5) tk.Entry(self.root, textvariable=self.base_url_var, width=50).grid(row=0, column=1, columnspan=5, pady=5) tk.Label(self.root, text="Image Directory:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=5) tk.Entry(self.root, textvariable=self.image_directory_var, width=50).grid(row=1, column=1, columnspan=5, pady=5) tk.Label(self.root, text="Max Depth:").grid(row=2, column=0, sticky=tk.W, padx=5, pady=5) tk.Entry(self.root, textvariable=self.max_depth_var, width=5).grid(row=2, column=1, pady=5) tk.Label(self.root, text="Page Threads:").grid(row=2, column=2, sticky=tk.W, padx=5, pady=5) tk.Entry(self.root, textvariable=self.page_threads_var, width=5).grid(row=2, column=3, pady=5) tk.Label(self.root, text="Image Threads:").grid(row=2, column=4, sticky=tk.W, padx=5, pady=5) tk.Entry(self.root, textvariable=self.image_threads_var, width=5).grid(row=2, column=5, pady=5) # Start button tk.Button(self.root, text="Start Crawling", command=self.start_crawling).grid(row=3, column=0, columnspan=6, pady=10) # Stats label self.stats_label = tk.Label(self.root, text="已爬取图片数: 0", padx=5, pady=5) self.stats_label.grid(row=4, column=0, columnspan=6, pady=5) # Running time label self.time_label = tk.Label(self.root, text="运行时间: 0秒", padx=5, pady=5) self.time_label.grid(row=5, column=0, columnspan=6, pady=5) self.start_time = 0 def start_crawling(self): self.start_time = time.time() self.stats_label.config(text="已爬取图片数: 0") self.time_label.config(text="运行时间: 0秒") crawler = ImageCrawler( base_url=self.base_url_var.get(), image_directory=self.image_directory_var.get(), max_depth=self.max_depth_var.get(), page_threads=self.page_threads_var.get(), image_threads=self.image_threads_var.get(), debug_mode=True, gui=self ) threading.Thread(target=crawler.start_crawling).start() def update_stats(self, num_images): elapsed_time = time.time() - self.start_time elapsed_time_str = str(timedelta(seconds=round(elapsed_time))) self.stats_label.config(text=f"已爬取图片数: {num_images}") self.time_label.config(text=f"运行时间: {elapsed_time_str}") class ImageCrawler: def __init__(self, base_url, image_directory, max_depth, page_threads, image_threads, debug_mode=True, gui=None): self.base_url = base_url self.image_directory = image_directory self.max_depth = max_depth self.page_threads = page_threads self.image_threads = image_threads self.visited_urls = set() self.image_hashes = set() self.debug_mode = debug_mode self.gui = gui logging.basicConfig(filename='crawler_log.txt', level=logging.ERROR, format='%(asctime)s - %(levelname)s: %(message)s') self.session = requests.Session() retries = Retry(total=10, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) self.session.mount('http://', HTTPAdapter(max_retries=retries)) self.session.mount('https://', HTTPAdapter(max_retries=retries)) self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Referer': self.base_url, } if not os.path.exists(self.image_directory): os.makedirs(self.image_directory) def start_crawling(self): print("开始爬取...") self.crawl(self.base_url, 0) def crawl(self, url, depth): if depth > self.max_depth or url in self.visited_urls: return try: self.visited_urls.add(url) response = self.session.get(url, headers=self.headers, timeout=10) if self.debug_mode: self.log_request_info(response.request) soup = BeautifulSoup(response.text, 'html.parser') img_pattern = re.compile(r'(https?://[^\s]+?\.(?:jpg|jpeg|png|gif|bmp|svg|webp))', re.IGNORECASE) page_pattern = re.compile(r'href=["\'](https?://[^\s]+?)["\']', re.IGNORECASE) image_urls = img_pattern.findall(str(soup)) page_urls = page_pattern.findall(str(soup)) thread_pool = [] for img_url in image_urls: if not img_url.startswith(("data:image", "javascript")): thread = threading.Thread(target=self.download_image, args=(img_url, url)) thread.start() thread_pool.append(thread) # Limit the number of threads if len(thread_pool) >= self.image_threads: for t in thread_pool: t.join() thread_pool = [] for next_url in page_urls: if next_url: base_domain = urlparse(self.base_url).hostname next_domain = urlparse(next_url).hostname if next_domain == base_domain: self.crawl(next_url, depth + 1) # Join any remaining threads for t in thread_pool: t.join() except Exception as e: error_msg = f"Error crawling {url}: {str(e)}" print(error_msg) logging.error(error_msg) def download_image(self, img_url, referer_url): try: if not img_url.startswith(("http", "https")): img_url = self.base_url.rstrip('/') + '/' + img_url.lstrip('/') headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Referer': referer_url, } response = self.session.get(img_url, headers=headers, verify=False) if self.debug_mode: self.log_request_info(response.request) image_data = response.content if self.is_large_enough(image_data): image_hash = hashlib.md5(image_data).hexdigest() if image_hash not in self.image_hashes: self.image_hashes.add(image_hash) image_extension = img_url.split('.')[-1] image_name = f"{image_hash}.{image_extension}" image_path = os.path.join(self.image_directory, image_name) threading.Thread(target=self._save_image, args=(image_data, image_path, img_url)).start() except Exception as e: error_msg = f"Error downloading image {img_url}: {str(e)}" print(error_msg) logging.error(error_msg) def _save_image(self, image_data, image_path, img_url): try: with open(image_path, 'wb') as f: f.write(image_data) print(f"Downloaded image: {img_url}") self.update_stats() except Exception as e: error_msg = f"Error saving image {img_url}: {str(e)}" print(error_msg) logging.error(error_msg) def is_large_enough(self, image_data): try: image = Image.open(BytesIO(image_data)) width, height = image.size return width >= 300 and height >= 300 except Exception as e: print(f"Error checking image dimensions for image data: {str(e)}") return False def update_stats(self): if self.gui: num_images = len(self.image_hashes) self.gui.update_stats(num_images) def log_request_info(self, request): print(f"Request URL: {request.url}") print("Request Headers:") for key, value in request.headers.items(): print(f"{key}: {value}") print("\n") if __name__ == "__main__": gui = ImageCrawlerGUI() gui.root.mainloop()
如果你不喜欢使用GUI的,请使用下面的代码
开发版 V2.1 import logging import threading import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from bs4 import BeautifulSoup import hashlib import os import re from PIL import Image from io import BytesIO from urllib.parse import urlparse class ImageCrawler: def __init__(self, debug_mode=True): self.base_url = "https://www.abc.abc" self.image_directory = "downloaded_images" self.max_depth = 5 self.visited_urls = set() self.image_hashes = set() self.debug_mode = debug_mode logging.basicConfig(filename='crawler_log.txt', level=logging.ERROR, format='%(asctime)s - %(levelname)s: %(message)s') self.session = requests.Session() retries = Retry(total=10, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) self.session.mount('http://', HTTPAdapter(max_retries=retries)) self.session.mount('https://', HTTPAdapter(max_retries=retries)) self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Referer': self.base_url, } if not os.path.exists(self.image_directory): os.makedirs(self.image_directory) def start_crawling(self): print("开始爬取...") self.crawl(self.base_url, 0) def crawl(self, url, depth): if depth > self.max_depth or url in self.visited_urls: return try: self.visited_urls.add(url) response = self.session.get(url, headers=self.headers, timeout=10) if self.debug_mode: self.log_request_info(response.request) soup = BeautifulSoup(response.text, 'html.parser') img_pattern = re.compile(r'(https?://[^\s]+?\.(?:jpg|jpeg|png|gif|bmp|svg|webp))', re.IGNORECASE) page_pattern = re.compile(r'href=["\'](https?://[^\s]+?)["\']', re.IGNORECASE) image_urls = img_pattern.findall(str(soup)) page_urls = page_pattern.findall(str(soup)) for img_url in image_urls: if not img_url.startswith(("data:image", "javascript")): self.download_image(img_url, url) for next_url in page_urls: if next_url: base_domain = urlparse(self.base_url).hostname next_domain = urlparse(next_url).hostname if next_domain == base_domain: self.crawl(next_url, depth + 1) except Exception as e: error_msg = f"Error crawling {url}: {str(e)}" print(error_msg) logging.error(error_msg) def download_image(self, img_url, referer_url): try: if not img_url.startswith(("http", "https")): img_url = self.base_url.rstrip('/') + '/' + img_url.lstrip('/') headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Referer': referer_url, } response = self.session.get(img_url, headers=headers, verify=False) if self.debug_mode: self.log_request_info(response.request) image_data = response.content if self.is_large_enough(image_data): image_hash = hashlib.md5(image_data).hexdigest() if image_hash not in self.image_hashes: self.image_hashes.add(image_hash) image_extension = img_url.split('.')[-1] image_name = f"{image_hash}.{image_extension}" image_path = os.path.join(self.image_directory, image_name) threading.Thread(target=self._save_image, args=(image_data, image_path, img_url)).start() except Exception as e: error_msg = f"Error downloading image {img_url}: {str(e)}" print(error_msg) logging.error(error_msg) def _save_image(self, image_data, image_path, img_url): try: with open(image_path, 'wb') as f: f.write(image_data) print(f"Downloaded image: {img_url}") self.update_stats() except Exception as e: error_msg = f"Error saving image {img_url}: {str(e)}" print(error_msg) logging.error(error_msg) def is_large_enough(self, image_data): try: image = Image.open(BytesIO(image_data)) width, height = image.size return width >= 300 and height >= 300 except Exception as e: print(f"Error checking image dimensions for image data: {str(e)}") return False def update_stats(self): print(f"已爬取图片数: {len(self.image_hashes)}") def log_request_info(self, request): print(f"Request URL: {request.url}") print("Request Headers:") for key, value in request.headers.items(): print(f"{key}: {value}") print("\n") if __name__ == "__main__": crawler = ImageCrawler(debug_mode=True) crawler.start_crawling()
稳定版 V1.1 import logging import threading import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from selenium import webdriver from bs4 import BeautifulSoup import hashlib import os import re from PIL import Image from io import BytesIO from urllib.parse import urlparse class ImageCrawler: def __init__(self): self.base_url = "https://www.abc.abc/" #测试链接 self.image_directory = "downloaded_images" #下载路径 self.max_depth = 5 self.visited_urls = set() self.image_hashes = set() # 添加日志配置 logging.basicConfig(filename='crawler_log.txt', level=logging.ERROR, format='%(asctime)s - %(levelname)s: %(message)s') # 创建一个带连接池的 Session self.session = requests.Session() retries = Retry(total=10, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) self.session.mount('http://', HTTPAdapter(max_retries=retries)) self.session.mount('https://', HTTPAdapter(max_retries=retries)) def start_crawling(self): print("开始爬取...") self.crawl(self.base_url, 0) def crawl(self, url, depth): if depth > self.max_depth or url in self.visited_urls: return try: self.visited_urls.add(url) driver = webdriver.Chrome() driver.set_page_load_timeout(10) # 设置加载超时时间为10秒 try: driver.get(url) except Exception as timeout_exception: print(f"Page load timed out for {url}: {str(timeout_exception)}") logging.error(f"Page load timed out for {url}: {str(timeout_exception)}") driver.quit() return soup = BeautifulSoup(driver.page_source, 'html.parser') driver.quit() # 使用正则表达式提取图片链接和页面链接 img_pattern = re.compile(r'(https?://[^\s]+?\.(?:jpg|jpeg|png|gif|bmp|svg|webp))', re.IGNORECASE) page_pattern = re.compile(r'href=["\'](https?://[^\s]+?)["\']', re.IGNORECASE) image_urls = img_pattern.findall(str(soup)) page_urls = page_pattern.findall(str(soup)) for img_url in image_urls: if not img_url.startswith(("data:image", "javascript")): self.download_image(img_url,url) for next_url in page_urls: if next_url: # 提取根域名 base_domain = urlparse(self.base_url).hostname next_domain = urlparse(next_url).hostname # 检查根域名是否一致 if next_domain == base_domain: self.crawl(next_url, depth + 1) except Exception as e: error_msg = f"Error crawling {url}: {str(e)}" print(error_msg) logging.error(error_msg) def download_image(self, img_url,page_url_now): try: if not img_url.startswith(("http", "https")): img_url = self.base_url.rstrip('/') + '/' + img_url.lstrip('/') headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Referer': page_url_now, # Add more headers if needed } # 使用连接池发送请求,携带请求头信息 response = self.session.get(img_url, headers=headers, verify=False) image_data = response.content if self.is_large_enough(image_data): image_hash = hashlib.md5(image_data).hexdigest() if image_hash not in self.image_hashes: self.image_hashes.add(image_hash) image_extension = img_url.split('.')[-1] # 获取图片文件扩展名 image_name = f"{image_hash}.{image_extension}" image_path = os.path.join(self.image_directory, image_name) threading.Thread(target=self._save_image, args=(image_data, image_path, img_url)).start() except Exception as e: error_msg = f"Error downloading image {img_url}: {str(e)}" print(error_msg) logging.error(error_msg) def _save_image(self, image_data, image_path, img_url): try: with open(image_path, 'wb') as f: f.write(image_data) print(f"Downloaded image: {img_url}") self.update_stats() except Exception as e: error_msg = f"Error saving image {img_url}: {str(e)}" print(error_msg) logging.error(error_msg) def is_large_enough(self, image_data): try: image = Image.open(BytesIO(image_data)) width, height = image.size return width >= 300 and height >= 300 except Exception as e: # Handle image processing errors (e.g., non-image content) print(f"Error checking image dimensions for image data: {str(e)}") return False def update_stats(self): print(f"已爬取图片数: {len(self.image_hashes)}") if __name__ == "__main__": crawler = ImageCrawler() crawler.start_crawling()