当前位置:首页 > Software > Python > 正文内容

(原创)Python图片爬虫:绕过防盗的巧妙解决方案

chanra1n1年前 (2024-01-05)Python958

1. 程序概述

介绍这个基于Python的图片爬虫工具,它是一个用于学习和测试的代码,旨在帮助用户从指定网站下载图片。该爬虫具有以下特点:

  • 使用Selenium和BeautifulSoup进行网页解析和内容提取。

  • 利用多线程提高图片下载效率。

  • 实现了图片的去重,避免下载相同的图片。

  • 支持设置爬取深度,防止无限递归。

2. 程序结构

该程序主要由以下几个部分组成:

  • ImageCrawler类: 爬虫的核心类,负责启动爬取和处理每个页面的信息。

  • crawl方法: 递归地爬取页面,提取图片链接和页面链接。

  • download_image方法: 下载图片并保存到本地,实现了图片的去重。

  • is_large_enough方法: 判断图片是否足够大,避免下载过小的图片。

  • update_stats方法: 更新爬取统计信息,显示已爬取的图片数量。

3. 程序运行

程序通过Chrome浏览器驱动实现页面加载,支持设置超时时间。使用连接池发送请求,携带请求头信息,确保下载的是目标图片。

4. 日志记录

引入日志记录,记录页面加载超时、下载错误等异常情况,方便用户追踪和解决问题。

5. 优势和不足

优势:
  • 简单易用:用户只需实例化ImageCrawler类并调用start_crawling方法即可开始爬取。

  • 强大的功能:支持多线程、页面深度控制、图片大小过滤等功能,满足不同需求。

  • 可扩展性:用户可以根据实际需求轻松扩展功能或进行定制。

不足:
  • 依赖性:需要安装Selenium、BeautifulSoup等第三方库,并下载对应浏览器的驱动。

  • 可配置性:虽然提供了一些配置选项,但仍有一些硬编码的地方,不够灵活。

6. 使用示例

用户只需在主程序中实例化ImageCrawler类,并调用start_crawling方法即可开始爬取图片。具体使用方法可参考主程序末尾的if __name__ == "__main__":部分。

7. 总结

该图片爬虫工具是一个简单而强大的学习和测试工具,提供了基本的图片爬取功能,同时具备一些高级特性。用户可以根据实际需求灵活调整配置,快速实现定制化的图片爬取任务。

代码

如果你喜欢使用GUI,请使用这个代码,否则请往下翻。

import logging
import threading
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from bs4 import BeautifulSoup
import hashlib
import os
import re
from PIL import Image, ImageTk
from io import BytesIO
from urllib.parse import urlparse
import tkinter as tk
from tkinter import ttk
import time
from datetime import timedelta

class ImageCrawlerGUI:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title("Image Crawler GUI")

        # Variables for user inputs
        self.base_url_var = tk.StringVar()
        self.base_url_var.set("https://abc.abc")
        self.image_directory_var = tk.StringVar()
        self.image_directory_var.set("downloaded_images")
        self.max_depth_var = tk.IntVar()
        self.max_depth_var.set(5)
        self.page_threads_var = tk.IntVar()
        self.page_threads_var.set(3)
        self.image_threads_var = tk.IntVar()
        self.image_threads_var.set(3)

        # Labels and Entry for user inputs
        tk.Label(self.root, text="Base URL:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
        tk.Entry(self.root, textvariable=self.base_url_var, width=50).grid(row=0, column=1, columnspan=5, pady=5)
        tk.Label(self.root, text="Image Directory:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=5)
        tk.Entry(self.root, textvariable=self.image_directory_var, width=50).grid(row=1, column=1, columnspan=5, pady=5)
        tk.Label(self.root, text="Max Depth:").grid(row=2, column=0, sticky=tk.W, padx=5, pady=5)
        tk.Entry(self.root, textvariable=self.max_depth_var, width=5).grid(row=2, column=1, pady=5)
        tk.Label(self.root, text="Page Threads:").grid(row=2, column=2, sticky=tk.W, padx=5, pady=5)
        tk.Entry(self.root, textvariable=self.page_threads_var, width=5).grid(row=2, column=3, pady=5)
        tk.Label(self.root, text="Image Threads:").grid(row=2, column=4, sticky=tk.W, padx=5, pady=5)
        tk.Entry(self.root, textvariable=self.image_threads_var, width=5).grid(row=2, column=5, pady=5)

        # Start button
        tk.Button(self.root, text="Start Crawling", command=self.start_crawling).grid(row=3, column=0, columnspan=6, pady=10)

        # Stats label
        self.stats_label = tk.Label(self.root, text="已爬取图片数: 0", padx=5, pady=5)
        self.stats_label.grid(row=4, column=0, columnspan=6, pady=5)

        # Running time label
        self.time_label = tk.Label(self.root, text="运行时间: 0秒", padx=5, pady=5)
        self.time_label.grid(row=5, column=0, columnspan=6, pady=5)

        self.start_time = 0

    def start_crawling(self):
        self.start_time = time.time()
        self.stats_label.config(text="已爬取图片数: 0")
        self.time_label.config(text="运行时间: 0秒")

        crawler = ImageCrawler(
            base_url=self.base_url_var.get(),
            image_directory=self.image_directory_var.get(),
            max_depth=self.max_depth_var.get(),
            page_threads=self.page_threads_var.get(),
            image_threads=self.image_threads_var.get(),
            debug_mode=True,
            gui=self
        )
        threading.Thread(target=crawler.start_crawling).start()

    def update_stats(self, num_images):
        elapsed_time = time.time() - self.start_time
        elapsed_time_str = str(timedelta(seconds=round(elapsed_time)))
        self.stats_label.config(text=f"已爬取图片数: {num_images}")
        self.time_label.config(text=f"运行时间: {elapsed_time_str}")

class ImageCrawler:
    def __init__(self, base_url, image_directory, max_depth, page_threads, image_threads, debug_mode=True, gui=None):
        self.base_url = base_url
        self.image_directory = image_directory
        self.max_depth = max_depth
        self.page_threads = page_threads
        self.image_threads = image_threads
        self.visited_urls = set()
        self.image_hashes = set()
        self.debug_mode = debug_mode
        self.gui = gui

        logging.basicConfig(filename='crawler_log.txt', level=logging.ERROR, format='%(asctime)s - %(levelname)s: %(message)s')

        self.session = requests.Session()
        retries = Retry(total=10, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
        self.session.mount('http://', HTTPAdapter(max_retries=retries))
        self.session.mount('https://', HTTPAdapter(max_retries=retries))

        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Referer': self.base_url,
        }

        if not os.path.exists(self.image_directory):
            os.makedirs(self.image_directory)

    def start_crawling(self):
        print("开始爬取...")
        self.crawl(self.base_url, 0)

    def crawl(self, url, depth):
        if depth > self.max_depth or url in self.visited_urls:
            return

        try:
            self.visited_urls.add(url)

            response = self.session.get(url, headers=self.headers, timeout=10)
            if self.debug_mode:
                self.log_request_info(response.request)

            soup = BeautifulSoup(response.text, 'html.parser')

            img_pattern = re.compile(r'(https?://[^\s]+?\.(?:jpg|jpeg|png|gif|bmp|svg|webp))', re.IGNORECASE)
            page_pattern = re.compile(r'href=["\'](https?://[^\s]+?)["\']', re.IGNORECASE)

            image_urls = img_pattern.findall(str(soup))
            page_urls = page_pattern.findall(str(soup))

            thread_pool = []

            for img_url in image_urls:
                if not img_url.startswith(("data:image", "javascript")):
                    thread = threading.Thread(target=self.download_image, args=(img_url, url))
                    thread.start()
                    thread_pool.append(thread)

                    # Limit the number of threads
                    if len(thread_pool) >= self.image_threads:
                        for t in thread_pool:
                            t.join()
                        thread_pool = []

            for next_url in page_urls:
                if next_url:
                    base_domain = urlparse(self.base_url).hostname
                    next_domain = urlparse(next_url).hostname

                    if next_domain == base_domain:
                        self.crawl(next_url, depth + 1)

            # Join any remaining threads
            for t in thread_pool:
                t.join()

        except Exception as e:
            error_msg = f"Error crawling {url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def download_image(self, img_url, referer_url):
        try:
            if not img_url.startswith(("http", "https")):
                img_url = self.base_url.rstrip('/') + '/' + img_url.lstrip('/')

            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Referer': referer_url,
            }

            response = self.session.get(img_url, headers=headers, verify=False)
            if self.debug_mode:
                self.log_request_info(response.request)

            image_data = response.content

            if self.is_large_enough(image_data):
                image_hash = hashlib.md5(image_data).hexdigest()
                if image_hash not in self.image_hashes:
                    self.image_hashes.add(image_hash)
                    image_extension = img_url.split('.')[-1]
                    image_name = f"{image_hash}.{image_extension}"
                    image_path = os.path.join(self.image_directory, image_name)

                    threading.Thread(target=self._save_image, args=(image_data, image_path, img_url)).start()

        except Exception as e:
            error_msg = f"Error downloading image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def _save_image(self, image_data, image_path, img_url):
        try:
            with open(image_path, 'wb') as f:
                f.write(image_data)

            print(f"Downloaded image: {img_url}")
            self.update_stats()

        except Exception as e:
            error_msg = f"Error saving image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def is_large_enough(self, image_data):
        try:
            image = Image.open(BytesIO(image_data))
            width, height = image.size
            return width >= 300 and height >= 300
        except Exception as e:
            print(f"Error checking image dimensions for image data: {str(e)}")
            return False

    def update_stats(self):
        if self.gui:
            num_images = len(self.image_hashes)
            self.gui.update_stats(num_images)

    def log_request_info(self, request):
        print(f"Request URL: {request.url}")
        print("Request Headers:")
        for key, value in request.headers.items():
            print(f"{key}: {value}")
        print("\n")

if __name__ == "__main__":
    gui = ImageCrawlerGUI()
    gui.root.mainloop()

如果你不喜欢使用GUI的,请使用下面的代码

开发版 V2.1
import logging
import threading
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from bs4 import BeautifulSoup
import hashlib
import os
import re
from PIL import Image
from io import BytesIO
from urllib.parse import urlparse

class ImageCrawler:
    def __init__(self, debug_mode=True):
        self.base_url = "https://www.abc.abc"
        self.image_directory = "downloaded_images"
        self.max_depth = 5
        self.visited_urls = set()
        self.image_hashes = set()
        self.debug_mode = debug_mode

        logging.basicConfig(filename='crawler_log.txt', level=logging.ERROR, format='%(asctime)s - %(levelname)s: %(message)s')

        self.session = requests.Session()
        retries = Retry(total=10, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
        self.session.mount('http://', HTTPAdapter(max_retries=retries))
        self.session.mount('https://', HTTPAdapter(max_retries=retries))

        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Referer': self.base_url,
        }

        if not os.path.exists(self.image_directory):
            os.makedirs(self.image_directory)

    def start_crawling(self):
        print("开始爬取...")
        self.crawl(self.base_url, 0)

    def crawl(self, url, depth):
        if depth > self.max_depth or url in self.visited_urls:
            return

        try:
            self.visited_urls.add(url)

            response = self.session.get(url, headers=self.headers, timeout=10)
            if self.debug_mode:
                self.log_request_info(response.request)

            soup = BeautifulSoup(response.text, 'html.parser')

            img_pattern = re.compile(r'(https?://[^\s]+?\.(?:jpg|jpeg|png|gif|bmp|svg|webp))', re.IGNORECASE)
            page_pattern = re.compile(r'href=["\'](https?://[^\s]+?)["\']', re.IGNORECASE)

            image_urls = img_pattern.findall(str(soup))
            page_urls = page_pattern.findall(str(soup))

            for img_url in image_urls:
                if not img_url.startswith(("data:image", "javascript")):
                    self.download_image(img_url, url)

            for next_url in page_urls:
                if next_url:
                    base_domain = urlparse(self.base_url).hostname
                    next_domain = urlparse(next_url).hostname

                    if next_domain == base_domain:
                        self.crawl(next_url, depth + 1)

        except Exception as e:
            error_msg = f"Error crawling {url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def download_image(self, img_url, referer_url):
        try:
            if not img_url.startswith(("http", "https")):
                img_url = self.base_url.rstrip('/') + '/' + img_url.lstrip('/')

            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Referer': referer_url,
            }

            response = self.session.get(img_url, headers=headers, verify=False)
            if self.debug_mode:
                self.log_request_info(response.request)

            image_data = response.content

            if self.is_large_enough(image_data):
                image_hash = hashlib.md5(image_data).hexdigest()
                if image_hash not in self.image_hashes:
                    self.image_hashes.add(image_hash)
                    image_extension = img_url.split('.')[-1]
                    image_name = f"{image_hash}.{image_extension}"
                    image_path = os.path.join(self.image_directory, image_name)

                    threading.Thread(target=self._save_image, args=(image_data, image_path, img_url)).start()

        except Exception as e:
            error_msg = f"Error downloading image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def _save_image(self, image_data, image_path, img_url):
        try:
            with open(image_path, 'wb') as f:
                f.write(image_data)

            print(f"Downloaded image: {img_url}")
            self.update_stats()

        except Exception as e:
            error_msg = f"Error saving image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def is_large_enough(self, image_data):
        try:
            image = Image.open(BytesIO(image_data))
            width, height = image.size
            return width >= 300 and height >= 300
        except Exception as e:
            print(f"Error checking image dimensions for image data: {str(e)}")
            return False

    def update_stats(self):
        print(f"已爬取图片数: {len(self.image_hashes)}")

    def log_request_info(self, request):
        print(f"Request URL: {request.url}")
        print("Request Headers:")
        for key, value in request.headers.items():
            print(f"{key}: {value}")
        print("\n")

if __name__ == "__main__":
    crawler = ImageCrawler(debug_mode=True)
    crawler.start_crawling()


稳定版 V1.1
import logging
import threading
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from selenium import webdriver
from bs4 import BeautifulSoup
import hashlib
import os
import re
from PIL import Image
from io import BytesIO
from urllib.parse import urlparse

class ImageCrawler:
    def __init__(self):
        self.base_url = "https://www.abc.abc/" #测试链接 
        self.image_directory = "downloaded_images" #下载路径
        self.max_depth = 5
        self.visited_urls = set()
        self.image_hashes = set()

        # 添加日志配置
        logging.basicConfig(filename='crawler_log.txt', level=logging.ERROR, format='%(asctime)s - %(levelname)s: %(message)s')

        # 创建一个带连接池的 Session
        self.session = requests.Session()
        retries = Retry(total=10, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
        self.session.mount('http://', HTTPAdapter(max_retries=retries))
        self.session.mount('https://', HTTPAdapter(max_retries=retries))

    def start_crawling(self):
        print("开始爬取...")
        self.crawl(self.base_url, 0)

    def crawl(self, url, depth):
        if depth > self.max_depth or url in self.visited_urls:
            return

        try:
            self.visited_urls.add(url)
            driver = webdriver.Chrome()
            driver.set_page_load_timeout(10)  # 设置加载超时时间为10秒

            try:
                driver.get(url)
            except Exception as timeout_exception:
                print(f"Page load timed out for {url}: {str(timeout_exception)}")
                logging.error(f"Page load timed out for {url}: {str(timeout_exception)}")
                driver.quit()
                return

            soup = BeautifulSoup(driver.page_source, 'html.parser')
            driver.quit()

            # 使用正则表达式提取图片链接和页面链接
            img_pattern = re.compile(r'(https?://[^\s]+?\.(?:jpg|jpeg|png|gif|bmp|svg|webp))', re.IGNORECASE)
            page_pattern = re.compile(r'href=["\'](https?://[^\s]+?)["\']', re.IGNORECASE)

            image_urls = img_pattern.findall(str(soup))
            page_urls = page_pattern.findall(str(soup))

            for img_url in image_urls:
                if not img_url.startswith(("data:image", "javascript")):
                    self.download_image(img_url,url)

            for next_url in page_urls:
                if next_url:
                    # 提取根域名
                    base_domain = urlparse(self.base_url).hostname
                    next_domain = urlparse(next_url).hostname

                    # 检查根域名是否一致
                    if next_domain == base_domain:
                        self.crawl(next_url, depth + 1)

        except Exception as e:
            error_msg = f"Error crawling {url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def download_image(self, img_url,page_url_now):
        try:
            if not img_url.startswith(("http", "https")):
                img_url = self.base_url.rstrip('/') + '/' + img_url.lstrip('/')

            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Referer': page_url_now,
                # Add more headers if needed
            }

            # 使用连接池发送请求,携带请求头信息
            response = self.session.get(img_url, headers=headers, verify=False)
            image_data = response.content

            if self.is_large_enough(image_data):
                image_hash = hashlib.md5(image_data).hexdigest()
                if image_hash not in self.image_hashes:
                    self.image_hashes.add(image_hash)
                    image_extension = img_url.split('.')[-1]  # 获取图片文件扩展名
                    image_name = f"{image_hash}.{image_extension}"
                    image_path = os.path.join(self.image_directory, image_name)

                    threading.Thread(target=self._save_image, args=(image_data, image_path, img_url)).start()

        except Exception as e:
            error_msg = f"Error downloading image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def _save_image(self, image_data, image_path, img_url):
        try:
            with open(image_path, 'wb') as f:
                f.write(image_data)

            print(f"Downloaded image: {img_url}")
            self.update_stats()

        except Exception as e:
            error_msg = f"Error saving image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def is_large_enough(self, image_data):
        try:
            image = Image.open(BytesIO(image_data))
            width, height = image.size
            return width >= 300 and height >= 300
        except Exception as e:
            # Handle image processing errors (e.g., non-image content)
            print(f"Error checking image dimensions for image data: {str(e)}")
            return False

    def update_stats(self):
        print(f"已爬取图片数: {len(self.image_hashes)}")

if __name__ == "__main__":
    crawler = ImageCrawler()
    crawler.start_crawling()


扫描二维码推送至手机访问。

版权声明:本文由我的FPGA发布,如需转载请注明出处。

本文链接:https://world.myfpga.cn/index.php/post/347.html

分享给朋友:

“(原创)Python图片爬虫:绕过防盗的巧妙解决方案” 的相关文章

2.Python中的基本运算

2.Python中的基本运算

我们打开Python,请你尝试输入如下算式并尝试理解有什么为什么是这样的?1+1 1+1.0 1-2 2-3.5 1*1 1*1.1 1/2 2/1 2/3 3/2 3//2 3/1.0 5/2.5我们不难得到如下结果2 2.0 -1 -1.5 1 1.1 0.5...

列表作为函数参数

列表作为函数参数

列表作为函数参数,函数中可以修改原列表def multiply(values,factor):     for i in range(len(values)):        values[i]*=factor          aList=[1,2,3,4,5]  multiply(aL...

Python自动清理错误图片,深度学习训练数据集准备

Python自动清理错误图片,深度学习训练数据集准备

使用python运行from PIL import Image from pathlib import Path import os   path = r'.'  ...

(原创)使用Python自动对子文件夹中的图片文件进行重命名

(原创)使用Python自动对子文件夹中的图片文件进行重命名

为了解决Python深度学习的时候,经常遇到的文件名问题import os # 获取指定目录下的所有子文件夹 def get_subfolders(path):     subfolders = []...

(原创)使用Python提取ISE工程的RTL代码

(原创)使用Python提取ISE工程的RTL代码

在工程文件夹下运行Python程序即可 #Author       : / #Description  : 从ISE的项目文件夹中提取rtl文件,用于LEDA调试 #Time ...

(原创)使用FFmpeg截取多个视频中的图片,创建训练集

(原创)使用FFmpeg截取多个视频中的图片,创建训练集

代码使用 Python 中的 subprocess 模块和 FFmpeg、FFprobe 工具来从视频文件中截取多个截图,并使用 PIL 库来检查截图文件是否损坏,如果损坏了就不保存该文件。下面是代码的具体流程:首先读取配置文件或命令行参数来指定截图数量、截图大小、截图时间间隔和截图质量等...