爬蟲之線程&協程&非同步

来源:https://www.cnblogs.com/Golanguage/archive/2020/03/20/12535623.html
-Advertisement-
Play Games

線程池 導包: 回調函數非同步將可迭代對象中的元素進行某種操作 註意事項:callback必須有一個參數,且只能有一個參數 非同步主要是被應用在耗時的操作 測試:同步&非同步效率 搭建一個flask,自己啟動服務,測試執行時間 新建一個 新建一個 文件夾,在該文件夾下創建一個HTML文件,我寫的是 ,隨便 ...


線程池

  • 導包:from multiprocessing.dummy import Pool
  • 回調函數非同步將可迭代對象中的元素進行某種操作
    • 註意事項:callback必須有一個參數,且只能有一個參數
  • 非同步主要是被應用在耗時的操作
from multiprocessing.dummy import Pool

pool = Pool(3)  # 實例化線程池對象,3是線程池的最大線程數
# 參數1:回調函數(只是函數名,不加括弧);參數2:列表
# 參數1會接收參數2列表中的某一個元素,回調函數可以對該列表元素進行某種操作
pool.map(callback,list)

測試:同步&非同步效率

搭建一個flask,自己啟動服務,測試執行時間

  • 新建一個server.py
from flask import Flask, render_template
import time

app = Flask(__name__)


@app.route('/xx')
def index_1():
    time.sleep(2)
    return render_template('test.html')


@app.route('/yy')
def index_2():
    time.sleep(2)
    return render_template('test.html')


@app.route('/oo')
def index_3():
    time.sleep(2)
    return render_template('test.html')


if __name__ == '__main__':
    app.run(debug=True)
  • 新建一個templates文件夾,在該文件夾下創建一個HTML文件,我寫的是test.html,隨便寫點數據
<html lang="en">
<head>
    <meta charset="UTF-8"/>
    <title>測試</title>
</head>
<body>
<div>
    <p>百裡守約</p>
</div>
<div class="song">
    <p>李清照</p>
    <p>王安石</p>
    <p>蘇軾</p>
    <p>柳宗元</p>
    <a href="http://www.song.com/" title="趙匡胤" target="_self">
        <span>this is span</span>
        宋朝是最強大的王朝,不是軍隊的強大,而是經濟很強大,國民都很有錢</a>
    <a href="" class="du">總為浮雲能蔽日,長安不見使人愁</a>
    <img src="http://www.baidu.com/meinv.jpg" alt=""/>
</div>
<div class="tang">
    <ul>
        <li><a href="http://www.baidu.com" title="qing">清明時節雨紛紛,路上行人欲斷魂,借問酒家何處有,牧童遙指杏花村</a></li>
        <li><a href="http://www.163.com" title="qin">秦時明月漢時關,萬里長征人未還,但使龍城飛將在,不教胡馬度陰山</a></li>
        <li><a href="http://www.126.com" id="qi">岐王宅里尋常見,崔九堂前幾度聞,正是江南好風景,落花時節又逢君</a></li>
        <li><a href="http://www.sina.com" class="du">杜甫</a></li>
        <li><a href="http://www.dudu.com" class="du">杜牧</a></li>
        <li><b>杜小月</b></li>
        <li><i>度蜜月</i></li>
        <li><a href="http://www.haha.com" id="feng">鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘</a></li>
    </ul>
</div>
</body>
</html>

同步&非同步執行時間

import requests
from bs4 import BeautifulSoup
import time
# 線程池模塊
from multiprocessing.dummy import Pool

urls = [
    'http://127.0.0.1:5000/xx',
    'http://127.0.0.1:5000/yy',
    'http://127.0.0.1:5000/oo',
]

# 數據的爬取,返回爬取到的頁面源碼數據
def get_request(url):
    page_text = requests.get(url=url).text
    return page_text

# 數據的解析,返回標簽的文本
def parse(page_text):
    soup = BeautifulSoup(page_text, 'lxml')
    return soup.select('#feng')[0].text

# 同步代碼
if __name__ == '__main__':
    start = time.time()
    for url in urls:
        page_text = get_request(url)
        text_data = parse(page_text)
        print(text_data)
    print(time.time() - start)
"""
執行結果:
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
6.056272029876709
"""

# 非同步代碼
if __name__ == '__main__':
    start = time.time()
    pool = Pool(3)  # 實例化線程池對象
    # 參數1:回調函數(只是函數名,不加括弧);參數2:列表
    # 參數1會接收參數2列表中的某一個元素,回調函數可以對該列表元素進行某種操作
    page_text_list = pool.map(get_request,urls)
    text_data = pool.map(parse,page_text_list)
    for i in text_data:
        print(i)
    print(time.time() - start)
"""
執行結果:
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
鳳凰臺上鳳凰游,鳳去台空江自流,吳宮花草埋幽徑,晉代衣冠成古丘
2.0537397861480713

不用for迴圈速度能提升0.01秒左右
"""

綜上所述:非同步代碼執行效率顯著提高

案例:基於線程池爬取梨視頻

  • 思路分析
    • 爬取到視頻詳情頁對應的url,存儲到一個可迭代對象中
    • 再次發送請求獲取視頻詳情頁真正的視頻地址
      • 註意:視頻詳情頁的video是js代碼動態生成的,需要用到正則解析
    • 寫一個callback,獲取視頻的二進位文件,持久化存儲
import requests
from lxml import etree
from multiprocessing.dummy import Pool
import re
import os

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
}

# 梨視頻財富板塊的地址
main_url = 'https://www.pearvideo.com/category_3'
# 解析出該板塊下視頻詳情頁的src
main_page_text = requests.get(url=main_url, headers=headers).text
tree = etree.HTML(main_page_text)
li_list = tree.xpath('//*[@id="listvideoListUl"]/li')
# 線程池
video_urls = []
for li in li_list:
    # 視頻詳情頁的具體地址和視頻標題
    detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0]
    name = li.xpath('./div/a/div[2]/text()')[0]
    # 對詳情頁發起請求
    page_text = requests.get(url=detail_url, headers=headers).text
    # 視頻詳情頁的video是js代碼動態生成的,使用正則解析
    ex = 'srcUrl="(.*?)",vdoUrl='
    video_url = re.findall(ex, page_text, re.S)[0]  # 返回的是列表類型
    dic = {
        'url': video_url,
        'name': name,
    }
    video_urls.append(dic)

# 回調函數
def get_video(url):
    # 對視頻地址發請求,將二進位文件持久化存儲
    video_data = requests.get(url=url['url'], headers=headers).content
    file_name = "./video/" + url['name'] + ".mp4"
    with open(file_name, 'wb') as f:
        f.write(video_data)
        print(url['name'], "下載完畢!")

# 創建存儲視頻的文件夾
dir_name = 'video'
if not os.path.exists(dir_name):
    os.mkdir(dir_name)
# 實例化線程池
pool = Pool(4)
pool.map(get_video, video_urls)

單線程+多任務的非同步協程

asyncio(重點)

特殊函數

  • 如果一個函數的定義被async關鍵字修飾後,則該函數是一個特殊函數。
  • 特殊之處:
    • 該函數被調用後,函數內部的實現語句不會被立即執行。
    • 該函數會返回一個協程對象

協程

  • 協程就是一個對象。當特殊函數被調用後,該函數就會返回一個協程對象。

  • 協程對象 == 特殊函數

    import asyncio
    from time import sleep
    
    async def get_request(url):
        print('正在請求:', url)
        sleep(2)
        print('請求成功:', url)
        return '666'
    # 返回一個協程對象
    g = get_request("https://www,qq.com")

任務對象

  • 就是對協程對象的進一步封裝(就是一個高級的協程對象)

  • 任務對象 == 協程對象 == 特殊函數(表示某個固定形式的任務)

    asyncio.ensure_future(協程對象)
    
    task = asyncio.ensure_future(g)
    
    # g:協程對象
  • 綁定回調:

    # 定義一個task的回調函數
    def callback(task):
        task.result() # 表示的是當前任務對象對應的特殊函數的返回值
        print("I'm callback:", task)
    
    task.add_done_callback(funcName)
    
    # task:任務對象
    # funcName:回調函數的名稱
    • funcName這個回調函數必須要帶一個參數,這個參數表示的就是當前的任務對象
      • 參數.result():表示的就是當前任務對象對應的特殊函數的返回值

事件迴圈對象

  • 創建事件迴圈對象

  • 需要將任務對象註冊到該事件迴圈對象中

    # 創建事件迴圈對象
    loop = asyncio.get_event_loop()
    # 將任務對象註冊/裝載到事件迴圈對象中,然後需要啟動迴圈對象
    loop.run_until_complete(task)  # 用於裝載且啟動事件迴圈
    
    # task:任務對象

等待

await:當阻塞操作結束後讓loop回頭執行阻塞之後的代碼。

掛起

asyncio.wait():將當前的任務對象交出cpu的使用權。

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

asyncio.wait   # 掛起操作
tasks   # 任務對象列表

重點註意事項

  • 在特殊函數實現內部不可以出現不支持非同步的模塊代碼,否則會中斷非同步效果

aiohttp(重點)

  • requests:不支持非同步,不可以出現在特殊函數內部。

  • aiohttp:支持非同步的網路請求模塊,和asyncio一起使用

    • pip install aiohttp
  • 代碼的編寫

    • 寫出基本架構
    import asyncio
    import aiohttp
    
    # 基於aiohttp實現非同步的網路請求
    async def get_requests(url):
        # 實例化了一個請求對象
        with aiohttp.ClientSession() as aio:
            # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
            with aio.get(url=url) as response:
                # text() 獲取字元串形式的響應數據
                # read() 獲取bytes類型的響應數據
                page_text = await response.text()
                return page_text
    • 細節補充(代碼參照完整代碼)
      • 在每一個with前加上async關鍵字
      • 在每一個阻塞操作前加上await關鍵字
  • 完整代碼

    import asyncio
    import aiohttp
    
    # 基於aiohttp實現非同步的網路請求
    async def get_requests(url):
        # 實例化了一個請求對象
        async with aiohttp.ClientSession() as aio:
            # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
            async with await aio.get(url=url) as response:
                # text() 獲取字元串形式的響應數據
                # read() 獲取bytes類型的響應數據
                page_text = await response.text()
                return page_text

單任務協程操作

import asyncio
from time import sleep

async def get_request(url):
    print('正在請求:', url)
    sleep(2)
    print('請求成功:', url)
    return '666'

# 定義一個task的回調函數
def callback(task):
    print("I'm callback:", task)

# 返回一個協程對象
g = get_request("https://www,qq.com")

# 創建一個任務對象
task = asyncio.ensure_future(g)
"""

# 給任務對象綁定回調函數
task.add_done_callback(callback)

# 創建事件迴圈對象
loop = asyncio.get_event_loop()
# 將任務對象註冊/裝載到事件迴圈對象中,然後需要啟動迴圈對象
loop.run_until_complete(task)  # 用於裝載且啟動事件迴圈
"""
執行結果:
正在請求: www,qq.com
正在請求: www,qq.com
"""

多任務協程操作

import asyncio
import time

start = time.time()
async def get_request(url):
    print('正在請求:', url)
    # await 當阻塞操作結束後讓loop回頭執行阻塞之後的代碼
    await asyncio.sleep(2)
    print('請求成功:', url)
    return '666'

urls = [
    'http://127.0.0.1:5000/xx',
    'http://127.0.0.1:5000/yy',
    'http://127.0.0.1:5000/oo',
]
tasks = []
for url in urls:
    c = get_request(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)

loop = asyncio.get_event_loop()
# 將任務列表註冊到事件迴圈的時候一定要將任務列表進行掛起操作
# asyncio.wait()  掛起操作,將當前的任務對象交出cpu的使用權
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:', time.time() - start)

單線程&多任務非同步爬蟲

基於Flask自測

  • 測試代碼在上述測試:同步&非同步效率,按照上述步驟啟動項目;然後運行下方代碼。
import asyncio
import time
import aiohttp
from lxml import etree

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
}

urls = [
    'http://127.0.0.1:5000/xx',
    'http://127.0.0.1:5000/yy',
    'http://127.0.0.1:5000/oo',
]

start = time.time()

"""
# 發起請求,獲取響應數據(不可以實現非同步)
async def get_requests(url):
    # requests是不支持非同步的模塊
    page_text = requests.get(url).text
    return page_text
"""

async def get_requests(url):
    """
    基於aiohttp實現非同步的網路請求
    :param url: 
    :return: 
    """
    # 實例化了一個請求對象
    async with aiohttp.ClientSession() as aio:
        # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
        async with await aio.get(url=url) as response:
            # text() 獲取字元串形式的響應數據
            # read() 獲取bytes類型的響應數據
            page_text = await response.text()
            return page_text

def parse(task):
    """
    定義回調函數
    :param task:
    :return:
    """
    page_text = task.result()  # 獲取特殊函數的返回值(請求到的頁面源碼數據)
    tree = etree.HTML(page_text)
    content = tree.xpath('//*[@id="feng"]/text()')[0]
    print(content)

tasks = []
for url in urls:
    c = get_requests(url)
    task = asyncio.ensure_future(c)
    task.add_done_callback(parse)
    tasks.append(task)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:', time.time() - start)

案例:基於單線程多任務非同步爬取梨視頻

  • 思路上述案例:基於線程池爬取梨視頻
import asyncio
import time
import aiohttp
from lxml import etree
import re
import os
import requests

# time模塊是為了測試爬取視頻的耗時
start = time.time()
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
}
# 梨視頻財富板塊的地址
main_url = 'https://www.pearvideo.com/category_3'
main_page_text = requests.get(url=main_url, headers=headers).text
tree = etree.HTML(main_page_text)
li_list = tree.xpath('//*[@id="listvideoListUl"]/li')
urls = []  # [{'url': video_url,'name': name},{}...]
for li in li_list:
    detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0]
    name = li.xpath('./div/a/div[2]/text()')[0]
    page_text = requests.get(url=detail_url, headers=headers).text
    # 視頻詳情頁的video是js代碼動態生成的
    ex = 'srcUrl="(.*?)",vdoUrl='
    video_url = re.findall(ex, page_text, re.S)[0]  # 返回的是列表類型
    dic = {
        'url': video_url,
        'name': name,
    }
    urls.append(dic)

# 基於aiohttp實現非同步的網路請求
async def get_requests(url):
    # 實例化了一個請求對象
    async with aiohttp.ClientSession() as aio:
        # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
        async with await aio.get(url=url['url'], headers=headers) as response:
            # text() 獲取字元串形式的響應數據
            # read() 獲取bytes類型的響應數據
            page_read = await response.read()
            dic = {
                "page_read": page_read,
                "name": url['name']
            }
            return dic


def parse(task):
    """
    定義回調函數
    :param task:
    :return:
    """
    dic_info = task.result()  # 獲取特殊函數的返回值(請求到的頁面源碼數據)
    file_name = "./video/" + dic_info["name"] + ".mp4"
    with open(file_name, 'wb') as f:
        f.write(dic_info['page_read'])
        print(dic_info["name"], "下載完畢!")

tasks = []
for url in urls:
    c = get_requests(url)
    task = asyncio.ensure_future(c)
    task.add_done_callback(parse)
    tasks.append(task)

dir_name = 'video'
if not os.path.exists(dir_name):
    os.mkdir(dir_name)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:', time.time() - start)

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我一直有用微信讀書閱讀的習慣,最近發現微信閱讀還有網頁版。登陸微信閱讀的網頁版,需要我們用App掃碼進行登陸。如下麵的界面所示: 使用你的微信閱讀App掃描完上面的二維碼並點擊確認登陸後,網頁版就能自動登陸。登陸後會展示你的閱讀記錄,書架信息等。 我突然很好奇,這個掃碼登陸到底是怎麼實現的,所以就去 ...
  • 首先下載好軟體,鏈接在這裡 鏈接:https://pan.baidu.com/s/1op-W-ZX1tqefHffs3m-r0A 提取碼:0jwm 這裡麵包含了Rational Rose 2007版的可視化建模軟體,也包含了破解文件,直接下載就可以了。 我在網上按照其他人的按照過程中出現了安裝之後打 ...
  • [toc] 領域驅動設計 運用領域模型 綁定模型和實現 聰明的項目組成員花費了幾個月的時間進行仔細的研究並且開發出了詳盡的領域模型(類圖)。然而對類圖研究不能讓我深入地瞭解該應用程式的代碼和設計,這讓我備感困擾。當開發人員開始實現應用程式時,他們很快就發現,儘管分析人員說得頭頭是道,他們依然無法將這 ...
  • [toc] 運用領域模型 交流與語言的使用 非原創,感謝《領域驅動設計》這本書 領域模型可成為軟體項目通用語言的核心。該模型是一組得自於項目人員頭腦中的概念,以及反映了領域深層含義的術語和關係。這些術語和相互關係提供了模型語言的語義,雖然語言是為領域量身定製的,但就技術開發而言,其依然足夠精確。正是 ...
  • [toc] 運用領域模型 消化知識 非原創,感謝《領域驅動設計》這本書 有效建模的要素 (1) 模型和實現的綁定。最初的原型雖然簡陋,但它在模型與實現之間建立了早期鏈接,而且在所有後續的迭代中我們一直在維護該鏈接。 (2) 建立了一種基於模型的語言。隨著項目的進展,雙方都能夠直接使用模型中的術語,並 ...
  • 前言 談到JAVA,就不得不提JVM JAVA程式員繞不開的話題.也許有童鞋會說,我不懂JVM,但是我一樣可以寫出JAVA代碼,我相信說這種話的童鞋,往往是只有1 3年的初級開發人員,對JAVA理解還不深,不明白JVM的重要性,那接下來我們來說說,為什麼要學習JVM? 1.理解JVM,才能幫助我們寫 ...
  • 一、BufferedWriter 1.使用帶有緩衝區的字元讀和寫進行試驗 package com.bjpowernode.java_learning; import java.io.*; ​ public class D100_1_BufferedWriter { public static voi ...
  • 隨機回去10位幸運者的名單,依次遞減,最後一位就是幸運者 1 package com.lottery.controller; 2 3 import java.io.BufferedReader; 4 import java.io.FileReader; 5 import java.io.IOExce ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...