python 基於aiohttp的非同步爬蟲實戰

来源:https://www.cnblogs.com/jiba/archive/2022/09/09/16672319.html
-Advertisement-
Play Games

鋼鐵知識庫,一個學習python爬蟲、數據分析的知識庫。人生苦短,快用python。 之前我們使用requests庫爬取某個站點的時候,每發出一個請求,程式必須等待網站返迴響應才能接著運行,而在整個爬蟲過程中,整個爬蟲程式是一直在等待的,實際上沒有做任何事情。 像這種占用磁碟/記憶體IO、網路IO的任 ...


鋼鐵知識庫,一個學習python爬蟲、數據分析的知識庫。人生苦短,快用python。

之前我們使用requests庫爬取某個站點的時候,每發出一個請求,程式必須等待網站返迴響應才能接著運行,而在整個爬蟲過程中,整個爬蟲程式是一直在等待的,實際上沒有做任何事情。

像這種占用磁碟/記憶體IO、網路IO的任務,大部分時間是CPU在等待的操作,就叫IO密集型任務。對於這種情況有沒有優化方案呢,當然有,那就是使用aiohttp庫實現非同步爬蟲。

aiohttp是什麼

我們在使用requests請求時,只能等一個請求先出去再回來,才會發送下一個請求。明顯效率不高阿,這時候如果換成非同步請求的方式,就不會有這個等待。一個請求發出去,不管這個請求什麼時間響應,程式通過await掛起協程對象後直接進行下一個請求。

解決方法就是通過 aiohttp + asyncio,什麼是aiohttp?一個基於 asyncio 的非同步 HTTP 網路模塊,可用於實現非同步爬蟲,速度明顯快於 requests 的同步爬蟲。

requests和aiohttp區別

區別就是一個同步一個是非同步。話不多說直接上代碼看效果。

安裝aiohttp

pip install aiohttp
  • requests同步示例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# author: 鋼鐵知識庫
import time
import requests

# 同步請求
def main():
    start = time.time()
    for i in range(5):
        res = requests.get('http://httpbin.org/delay/2')
        print(f'當前時間:{datetime.datetime.now()}, status_code = {res.status_code}')
    print(f'requests同步耗時:{time.time() - start}')
    
if __name__ == '__main__':
    main()

'''
當前時間:2022-09-05 15:44:51.991685, status_code = 200
當前時間:2022-09-05 15:44:54.528918, status_code = 200
當前時間:2022-09-05 15:44:57.057373, status_code = 200
當前時間:2022-09-05 15:44:59.643119, status_code = 200
當前時間:2022-09-05 15:45:02.167362, status_code = 200
requests同步耗時:12.785893440246582
'''

可以看到5次請求總共用12.7秒,再來看同樣的請求非同步多少時間。

  • aiohttp非同步示例:
#!/usr/bin/env python
# file: day6-9同步和非同步.py
# author: 鋼鐵知識庫
import asyncio
import time
import aiohttp

async def async_http():
    # 聲明一個支持非同步的上下文管理器
    async with aiohttp.ClientSession() as session:
        res = await session.get('http://httpbin.org/delay/2')
        print(f'當前時間:{datetime.datetime.now()}, status_code = {res.status}')

tasks = [async_http() for _ in range(5)]
start = time.time()
# Python 3.7 及以後,不需要顯式聲明事件迴圈,可以使用 asyncio.run()來代替最後的啟動操作
asyncio.run(asyncio.wait(tasks))
print(f'aiohttp非同步耗時:{time.time() - start}')

'''
當前時間:2022-09-05 15:42:32.363966, status_code = 200
當前時間:2022-09-05 15:42:32.366957, status_code = 200
當前時間:2022-09-05 15:42:32.374973, status_code = 200
當前時間:2022-09-05 15:42:32.384909, status_code = 200
當前時間:2022-09-05 15:42:32.390318, status_code = 200
aiohttp非同步耗時:2.5826876163482666
'''

兩次對比可以看到執行過程,時間一個是順序執行,一個是同時執行。這就是同步和非同步的區別。

aiohttp使用介紹

接下來我們會詳細介紹aiohttp庫的用法和爬取實戰。aiohttp 是一個支持非同步請求的庫,它和 asyncio 配合使用,可以使我們非常方便地實現非同步請求操作。asyncio模塊,其內部實現了對TCP、UDP、SSL協議的非同步操作,但是對於HTTP請求,就需要aiohttp實現了。

aiohttp分為兩部分,一部分是Client,一部分是Server。下麵來說說aiohttp客戶端部分的用法。

基本實例

先寫一個簡單的案例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 鋼鐵知識庫
import asyncio
import aiohttp

async def get_api(session, url):
    # 聲明一個支持非同步的上下文管理器
    async with session.get(url) as response:
        return await response.text(), response.status

async def main():
    async with aiohttp.ClientSession() as session:
        html, status = await get_api(session, 'http://httpbin.org/delay/2')
        print(f'html: {html[:50]}')
        print(f'status : {status}')

if __name__ == '__main__':
    #  Python 3.7 及以後,不需要顯式聲明事件迴圈,可以使用 asyncio.run(main())來代替最後的啟動操作
    asyncio.get_event_loop().run_until_complete(main())
'''
html: {
  "args": {}, 
  "data": "", 
  "files": {}, 
  
status : 200

Process finished with exit code 0
'''

aiohttp請求的方法和之前有明顯區別,主要包括如下幾點:

  1. 除了導入aiohttp庫,還必須引入asyncio庫,因為要實現非同步,需要啟動協程。
  2. 非同步的方法定義不同,前面都要統一加async來修飾。
  3. with as用於聲明上下文管理器,幫我們自動分配和釋放資源,加上async代碼支持非同步。
  4. 對於返回協程對象的操作,前面需要加await來修飾。response.text()返回的是協程對象。
  5. 最後運行啟用迴圈事件

註意:Python3.7及以後的版本中,可以使用asyncio.run(main())代替最後的啟動操作。

URL參數設置

對於URL參數的設置,我們可以藉助params設置,傳入一個字典即可,實例如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 鋼鐵知識庫
import aiohttp
import asyncio

async def main():
    params = {'name': '鋼鐵知識庫', 'age': 23}
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.httpbin.org/get', params=params) as res:
            print(await res.json())

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
{'args': {'age': '23', 'name': '鋼鐵知識庫'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-63162e34-1acf7bde7a6d801368494c72'}, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/get?name=鋼鐵知識庫&age=23'}
'''

可以看到實際請求的URL後面帶了尾碼,這就是params的內容。

請求類型

除了get請求,aiohttp還支持其它請求類型,如POST、PUT、DELETE等,和requests使用方式類似。

session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

要使用這些方法,只需要把對應的方法和參數替換一下。用法和get類似就不再舉例。

響應的幾個方法

對於響應來說,我們可以用如下方法分別獲取其中的響應情況。狀態碼、響應頭、響應體、響應體二進位內容、響應體JSON結果,實例如下:

#!/usr/bin/env python
# @Author  : 鋼鐵知識庫
import aiohttp
import asyncio

async def main():
    data = {'name': '鋼鐵知識庫', 'age': 23}
    async with aiohttp.ClientSession() as session:
        async with session.post('https://www.httpbin.org/post', data=data) as response:
            print('status:', response.status)  # 狀態碼
            print('headers:', response.headers)  # 響應頭
            print('body:', await response.text())  # 響應體
            print('bytes:', await response.read())  # 響應體二進位內容
            print('json:', await response.json())  # 響應體json數據

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
status: 200
headers: <CIMultiDictProxy('Date': 'Tue, 06 Sep 2022 00:18:36 GMT', 'Content-Type': 'application/json', 'Content-Length': '534', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
body: {
  "args": {}, 
  "data": "", 
  "files": {}, 
  "form": {
    "age": "23", 
    "name": "\u94a2\u94c1\u77e5\u8bc6\u5e93"
  }, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Content-Length": "57", 
    "Content-Type": "application/x-www-form-urlencoded", 
    "Host": "www.httpbin.org", 
    "User-Agent": "Python/3.8 aiohttp/3.8.1", 
    "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"
  }, 
  "json": null, 
  "origin": "122.55.11.188", 
  "url": "https://www.httpbin.org/post"
}

bytes: b'{\n  "args": {}, \n  "data": "", \n  "files": {}, \n  "form": {\n    "age": "23", \n    "name": "\\u94a2\\u94c1\\u77e5\\u8bc6\\u5e93"\n  }, \n  "headers": {\n    "Accept": "*/*", \n    "Accept-Encoding": "gzip, deflate", \n    "Content-Length": "57", \n    "Content-Type": "application/x-www-form-urlencoded", \n    "Host": "www.httpbin.org", \n    "User-Agent": "Python/3.8 aiohttp/3.8.1", \n    "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"\n  }, \n  "json": null, \n  "origin": "122.5.132.196", \n  "url": "https://www.httpbin.org/post"\n}\n'
json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '23', 'name': '鋼鐵知識庫'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-631691dc-6aa1b2b85045a1a0481d06e1'}, 'json': None, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/post'}
'''

可以看到有些欄位前面需要加await,因為其返回的是一個協程對象(如async修飾的方法),那麼前面就要加await。

超時設置

我們可以藉助ClientTimeout對象設置超時,例如要設置1秒的超時時間,可以這麼實現:

#!/usr/bin/env python
# @Author  : 鋼鐵知識庫
import aiohttp
import asyncio

async def main():
    # 設置 1 秒的超時 
    timeout = aiohttp.ClientTimeout(total=1)
    data = {'name': '鋼鐵知識庫', 'age': 23}
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get('https://www.httpbin.org/delay/2', data=data) as response:
            print('status:', response.status)  # 狀態碼

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
Traceback (most recent call last):
####中間省略####
    raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
'''

這裡設置了超時1秒請求延時2秒,發現拋出異常asyncio.TimeoutError,如果正常則響應200。

併發限制

aiohttp可以支持非常高的併發量,但面對高併發網站可能會承受不住,隨時有掛掉的危險,這時需要對併發進行一些控制。現在我們藉助asyncio 的Semaphore來控制併發量,實例如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 鋼鐵知識庫
import asyncio
from datetime import datetime
import aiohttp

# 聲明最大併發量
semaphore = asyncio.Semaphore(2)

async def get_api():
    async with semaphore:
        print(f'scrapting...{datetime.now()}')
        async with session.get('https://www.baidu.com') as response:
            await asyncio.sleep(2)
            # print(f'當前時間:{datetime.now()}, {response.status}')

async def main():
    global session
    session = aiohttp.ClientSession()
    tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)]
    await asyncio.gather(*tasks)
    await session.close()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
scrapting...2022-09-07 08:11:14.190000
scrapting...2022-09-07 08:11:14.292000
scrapting...2022-09-07 08:11:16.482000
scrapting...2022-09-07 08:11:16.504000
scrapting...2022-09-07 08:11:18.520000
scrapting...2022-09-07 08:11:18.521000
'''

在main方法里,我們聲明瞭1000個task,如果沒有通過Semaphore進行併發限制,那這1000放到gather方法後會被同時執行,併發量相當大。有了信號量的控制之後,同時運行的task數量就會被控制,這樣就能給aiohttp限制速度了。

aiohttp非同步爬取實戰

接下來我們通過非同步方式練手一個小說爬蟲,需求如下:

需求頁面:https://dushu.baidu.com/pc/detail?gid=4308080950

目錄介面:https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4308080950"}

詳情介面:https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4295122774","cid":"4295122774|116332"}

關鍵參數:book_id:小說ID、cid:章節id

採集要求:使用協程方式寫入,數據存放進mongo

需求分析:點開需求頁面,通過F12抓包可以發現兩個介面。一個目錄介面,一個詳情介面。
首先第一步先請求目錄介面拿到cid章節id,然後將cid傳遞給詳情介面拿到小說數據,最後存入mongo即可。

話不多說,直接上代碼:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 鋼鐵知識庫
# 不合適就是不合適,真正合適的,你不會有半點猶豫。
import asyncio
import json,re
import logging
import aiohttp
import requests
from utils.conn_db import ConnDb

# 日誌格式
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

# 章節目錄api
b_id = '4308080950'
url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+b_id+'"}'
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                  "Chrome/104.0.0.0 Safari/537.36"
}
# 併發聲明
semaphore = asyncio.Semaphore(5)

async def download(title,b_id, cid):
    data = {
        "book_id": b_id,
        "cid": f'{b_id}|{cid}',
    }
    data = json.dumps(data)
    detail_url = 'https://dushu.baidu.com/api/pc/getChapterContent?data={}'.format(data)
    async with semaphore:
        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.get(detail_url) as response:
                res = await response.json()
                content = {
                    'title': title,
                    'content': res['data']['novel']['content']
                }
                # print(title)
                await save_data(content)

async def save_data(data):
    if data:
        client = ConnDb().conn_motor_mongo()
        db = client.baidu_novel
        collection = db.novel
        logging.info('saving data %s', data)
        await collection.update_one(
            {'title': data.get('title')},
            {'$set': data},
            upsert=True
        )

async def main():
    res = requests.get(url, headers=headers)
    tasks = []
    for re in res.json()['data']['novel']['items']:     # 拿到某小說目錄cid
        title = re['title']
        cid = re['cid']
        tasks.append(download(title, b_id, cid))    # 將請求放到列表裡,再通過gather執行併發
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    asyncio.run(main())

至此,我們就使用aiohttp完成了對小說章節的爬取。

要實現非同步處理,得先要有掛起操作,當一個任務需要等待 IO 結果的時候,可以掛起當前任務,轉而去執行其他任務,這樣才能充分利用好資源,要實現非同步,需要瞭解 await 的用法,使用 await 可以將耗時等待的操作掛起,讓出控制權。當協程執行的時候遇到 await,時間迴圈就會將本協程掛起,轉而去執行別的協程,直到其他的協程掛起或執行完畢。

await 後面的對象必須是如下格式之一:

  • A native coroutine object returned from a native coroutine function,一個原生 coroutine 對象。
  • A generator-based coroutine object returned from a function decorated with types.coroutine,一個由 types.coroutine 修飾的生成器,這個生成器可以返回 coroutine 對象。
  • An object with an await method returning an iterator,一個包含 await 方法的對象返回的一個迭代器。

---- 20220909 鋼鐵知識庫

總結

以上就是藉助協程async和非同步aiohttp兩個主要模塊完成非同步爬蟲的內容,
aiohttp 以非同步方式爬取網站的耗時遠小於 requests 同步方式,以上列舉的例子希望對你有幫助。

註意,線程和協程是兩個概念,後面找機會我們再聊聊進程和線程、線程和協程的關係。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 2022-09-09 1、閉包的作用: 可以保存外部函數的變數 2、閉包的形成條件 (1)函數嵌套 (2)內部函數使用了外部函數的變數或者參數 (3)外部函數返回內部函數,這個使用了外部函數變數的內部函數稱為閉包 3、典例 1 # 函數嵌套 2 def func_out(): 3 # 外部函數 4 ...
  • 目錄 一.OpenGL 飽和度調節效果演示 1.IOS 飽和度演示效果 2.Windows OpenGL ES 飽和度演示效果 3.Windows OpenGL 飽和度演示效果 二.OpenGL 飽和度調節源碼下載 1.IOS Object-C 版本 2.Windows OpenGL ES 版本 3 ...
  • 一個菜鳥的設計模式之旅,使用 Golang 實現。本節實現解釋器模式。程式可按需載入用戶自定義的.work尾碼文件,將每行的命令解釋為具體行為。喵叫幾次、進程休眠幾秒、輸出範圍內隨機數、運行另外的work文件。 ...
  • #一、format的基本玩法 ##🚀🚀一、什麼是format format是字元串內嵌(字元串內嵌:字元串中再嵌套字元串,加入雙引號或單引號)的一個方法,用於格式化字元串。以大括弧{}來標明被替換的字元串 ##🚀🚀format玩法一:按順序輸出(按照{}的順序依次匹配括弧中的值) >>>s ...
  • IO流01 1.文件基礎知識 什麼是文件? 文件,我們並不陌生。文件是保存數據的地方。比如大家經常使用的word文檔,txt文件,excel文件等,都是文件。它既可以保存一張圖片,也可以保存聲音、視頻…… 文件流 文件在程式中是以流的形式來操作的: 流:數據在數據源(文件)和程式(記憶體)之間經歷的路 ...
  • 說明 意義 1.在Spring中,Bean的作用域可以通過scope屬性來指定。 2.指定作用域的目的是 存儲在此類單例bean的高速緩存中,並且對該命名bean的所有後續請求和引用都返回該高速緩存的對象。(本身的理念就是以空間換時間的思維,創建步驟繁雜,而且頻繁用到,我就存起來,下次用的時候就不用 ...
  • 摘要:本篇文章主要講解基於理論的圖像分割方法,通過K-Means聚類演算法實現圖像分割或顏色分層處理。 本文分享自華為雲社區《[Python圖像處理] 十九.圖像分割之基於K-Means聚類的區域分割》,作者: eastmount。 本篇文章主要講解基於理論的圖像分割方法,通過K-Means聚類演算法實 ...
  • pring Boot Actuator 是 Spring Boot 提供的對應用的自省和監控功能,如健康檢查,審計,指標收集,HTTP 跟蹤等,可以幫助開發和運維人員監控和管理 Spring Boot 應用。該模塊採集應用的內部信息,並暴露給外部的模塊,支持 HTTP 和 JMX,並可以與一些第三方... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...