如何用aiohttp实现每秒千次的网页抓取 原创

2025-08-14 16:59

引言

在当今大数据时代,高效的网络爬虫是数据采集的关键工具。传统的同步爬虫(如requests库)由于受限于I/O阻塞,难以实现高并发请求。而Python的aiohttp库结合asyncio,可以轻松实现异步高并发爬虫,达到每秒千次甚至更高的请求速率。

本文将详细介绍如何使用aiohttp构建一个高性能爬虫,涵盖以下内容:

  1. aiohttp的基本原理与优势

  2. 搭建异步爬虫框架

  3. 优化并发请求(连接池、超时控制)

  4. 代理IP与User-Agent轮换(应对反爬)

  5. 性能测试与优化(实现1000+ QPS)

最后,我们将提供一个完整的代码示例,并进行基准测试,展示如何真正实现每秒千次的网页抓取。

1. aiohttp的基本原理与优势

1.1 同步 vs. 异步爬虫

1.2 aiohttp的核心组件

2. 搭建异步爬虫框架

2.1 安装依赖

2.2 基础爬虫示例

import aiohttp
import asyncio
from bs4 import BeautifulSoup

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def parse(url):
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, url)
        soup = BeautifulSoup(html, 'html.parser')
        title = soup.title.string
        print(f"URL: {url} | Title: {title}")

async def main(urls):
    tasks = [parse(url) for url in urls]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    urls = [
        "https://example.com",
        "https://python.org",
        "https://aiohttp.readthedocs.io",
    ]
    asyncio.run(main(urls))

代码解析

  1. fetch() 发起HTTP请求并返回HTML。

  2. parse() 解析HTML并提取标题。

  3. main() 使用asyncio.gather()并发执行多个任务。

3. 优化并发请求(实现1000+ QPS)

3.1 使用连接池(TCP Keep-Alive)

默认情况下,aiohttp会自动复用TCP连接,但我们可以手动优化:

conn = aiohttp.TCPConnector(limit=100, force_close=False)  # 最大100个连接
async with aiohttp.ClientSession(connector=conn) as session:
    # 发起请求...

3.2 控制并发量(Semaphore)

避免因请求过多被目标网站封禁:

semaphore = asyncio.Semaphore(100)  # 限制并发数为100

async def fetch(session, url):
    async with semaphore:
        async with session.get(url) as response:
            return await response.text()

3.3 超时设置

防止某些请求卡住整个爬虫:

timeout = aiohttp.ClientTimeout(total=10)  # 10秒超时
async with session.get(url, timeout=timeout) as response:
    # 处理响应...

4. 代理IP与User-Agent轮换(应对反爬)

4.1 随机User-Agent

from fake_useragent import UserAgent

ua = UserAgent()
headers = {"User-Agent": ua.random}

async def fetch(session, url):
    async with session.get(url, headers=headers) as response:
        return await response.text()

4.2 代理IP池

import aiohttp
import asyncio
from fake_useragent import UserAgent

# 代理配置
proxyHost = "www.16yun.cn"
proxyPort = "5445"
proxyUser = "16QMSOML"
proxyPass = "280651"

# 构建带认证的代理URL
proxy_auth = aiohttp.BasicAuth(proxyUser, proxyPass)
proxy_url = f"http://{proxyHost}:{proxyPort}"

ua = UserAgent()
semaphore = asyncio.Semaphore(100)  # 限制并发数

async def fetch(session, url):
    headers = {"User-Agent": ua.random}
    timeout = aiohttp.ClientTimeout(total=10)
    async with semaphore:
        async with session.get(
            url,
            headers=headers,
            timeout=timeout,
            proxy=proxy_url,
            proxy_auth=proxy_auth
        ) as response:
            return await response.text()

async def main(urls):
    conn = aiohttp.TCPConnector(limit=100, force_close=False)
    async with aiohttp.ClientSession(connector=conn) as session:
        tasks = [fetch(session, url) for url in urls]
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    urls = ["https://example.com"] * 1000
    asyncio.run(main(urls))

5. 性能测试(实现1000+ QPS)

5.1 基准测试代码

import time

async def benchmark():
    urls = ["https://example.com"] * 1000  # 测试1000次请求
    start = time.time()
    await main(urls)
    end = time.time()
    qps = len(urls) / (end - start)
    print(f"QPS: {qps:.2f}")

asyncio.run(benchmark())

5.2 优化后的完整代码

import aiohttp
import asyncio
from fake_useragent import UserAgent

ua = UserAgent()
semaphore = asyncio.Semaphore(100)  # 限制并发数

async def fetch(session, url):
    headers = {"User-Agent": ua.random}
    timeout = aiohttp.ClientTimeout(total=10)
    async with semaphore:
        async with session.get(url, headers=headers, timeout=timeout) as response:
            return await response.text()

async def main(urls):
    conn = aiohttp.TCPConnector(limit=100, force_close=False)
    async with aiohttp.ClientSession(connector=conn) as session:
        tasks = [fetch(session, url) for url in urls]
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    urls = ["https://example.com"] * 1000
    asyncio.run(main(urls))

5.3 测试结果

结论

通过aiohttpasyncio,我们可以轻松构建一个高并发的异步爬虫,实现每秒千次以上的网页抓取。关键优化点包括:
使用ClientSession管理连接池
控制并发量(Semaphore)
代理IP和随机User-Agent防止封禁
超时设置避免卡死



阅读 25 / 评论 0

 相关视频教程更多课程