Python 大數據量文本文件高效解析方案代碼實現

来源:https://www.cnblogs.com/shouke/archive/2022/12/18/16975025.html
-Advertisement-
Play Games

大數據量文本文件高效解析方案代碼實現 測試環境 Python 3.6.2 Win 10 記憶體 8G,CPU I5 1.6 GHz 背景描述 這個作品來源於一個日誌解析工具的開發,這個開發過程中遇到的一個痛點,就是日誌文件多,日誌數據量大,解析耗時長。在這種情況下,尋思一種高效解析數據解析方案。 解決 ...


大數據量文本文件高效解析方案代碼實現

測試環境

Python 3.6.2

Win 10 記憶體 8G,CPU I5 1.6 GHz

背景描述

這個作品來源於一個日誌解析工具的開發,這個開發過程中遇到的一個痛點,就是日誌文件多,日誌數據量大,解析耗時長。在這種情況下,尋思一種高效解析數據解析方案。

解決方案描述

1、採用多線程讀取文件

2、採用按塊讀取文件替代按行讀取文件

由於日誌文件都是文本文件,需要讀取其中每一行進行解析,所以一開始會很自然想到採用按行讀取,後面發現合理配置下,按塊讀取,會比按行讀取更高效。

按塊讀取來的問題就是,可能導致完整的數據行分散在不同數據塊中,那怎麼解決這個問題呢?解答如下:

將數據塊按換行符\n切分得到日誌行列表,列表第一個元素可能是一個完整的日誌行,也可能是上一個數據塊末尾日誌行的組成部分,列表最後一個元素可能是不完整的日誌行(即下一個數據塊開頭日誌行的組成部分),也可能是空字元串(日誌塊中的日誌行數據全部是完整的),根據這個規律,得出以下公式,通過該公式,可以得到一個新的數據塊,對該數據塊二次切分,可以得到數據完整的日誌行

上一個日誌塊首部日誌行 +\n + 尾部日誌行 + 下一個數據塊首部日誌行 + \n + 尾部日誌行 + ...

3、將數據解析操作拆分為可並行解析部分和不可並行解析部分

數據解析往往涉及一些不可並行的操作,比如數據求和,最值統計等,如果不進行拆分,並行解析時勢必需要添加互斥鎖,避免數據覆蓋,這樣就會大大降低執行的效率,特別是不可並行操作占比較大的情況下。

對數據解析操作進行拆分後,可並行解析操作部分不用加鎖。考慮到Python GIL的問題,不可並行解析部分替換為單進程解析。

4、採用多進程解析替代多線程解析

採用多進程解析替代多線程解析,可以避開Python GIL全局解釋鎖帶來的執行效率問題,從而提高解析效率。

5、採用隊列實現“協同”效果

引入隊列機制,實現一邊讀取日誌,一邊進行數據解析:

  1. 日誌讀取線程將日誌塊存儲到隊列,解析進程從隊列獲取已讀取日誌塊,執行可並行解析操作
  2. 並行解析操作進程將解析後的結果存儲到另一個隊列,另一個解析進程從隊列獲取數據,執行不可並行解析操作。

代碼實現

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import time
from datetime import datetime
from joblib import Parallel, delayed, parallel_backend
from collections import deque
from multiprocessing import cpu_count
import threading


class LogParser(object):
    def __init__(self, chunk_size=1024*1024*10, process_num_for_log_parsing=cpu_count()):
        self.log_unparsed_queue = deque() # 用於存儲未解析日誌
        self.log_line_parsed_queue = deque()  # 用於存儲已解析日誌行
        self.is_all_files_read = False  # 標識是否已讀取所有日誌文件
        self.process_num_for_log_parsing = process_num_for_log_parsing # 併發解析日誌文件進程數
        self.chunk_size = chunk_size # 每次讀取日誌的日誌塊大小
        self.files_read_list = [] # 存放已讀取日誌文件
        self.log_parsing_finished = False # 標識是否完成日誌解析


    def read_in_chunks(self, filePath, chunk_size=1024*1024):
        """
        惰性函數(生成器),用於逐塊讀取文件。
        預設區塊大小:1M
        """

        with open(filePath, 'r', encoding='utf-8') as f:            
            while True:
                chunk_data = f.read(chunk_size)
                if not chunk_data:
                    break
                yield chunk_data


    def read_log_file(self, logfile_path):
        '''
        讀取日誌文件
        這裡假設日誌文件都是文本文件,按塊讀取後,可按換行符進行二次切分,以便獲取行日誌
        '''

        temp_list = []  # 二次切分後,頭,尾行日誌可能是不完整的,所以需要將日誌塊頭尾行日誌相連接,進行拼接
        for chunk in self.read_in_chunks(logfile_path, self.chunk_size):
            log_chunk = chunk.split('\n')
            temp_list.extend([log_chunk[0], '\n'])
            temp_list.append(log_chunk[-1])
            self.log_unparsed_queue.append(log_chunk[1:-1])
        self.log_unparsed_queue.append(''.join(temp_list).split('\n'))
        self.files_read_list.remove(logfile_path)


    def start_processes_for_log_parsing(self):
        '''啟動日誌解析進程'''

        with parallel_backend("multiprocessing", n_jobs=self.process_num_for_log_parsing):
            Parallel(require='sharedmem')(delayed(self.parse_logs)() for i in range(self.process_num_for_log_parsing))

        self.log_parsing_finished = True

    def parse_logs(self):
        '''解析日誌'''

        method_url_re_pattern = re.compile('(HEAD|POST|GET)\s+([^\s]+?)\s+',re.DOTALL)
        url_time_taken_extractor = re.compile('HTTP/1\.1.+\|(.+)\|\d+\|', re.DOTALL)

        while self.log_unparsed_queue or self.files_read_list:
            if not self.log_unparsed_queue:
                continue
            log_line_list = self.log_unparsed_queue.popleft()
            for log_line in log_line_list:
                #### do something with log_line
                if not log_line.strip():
                    continue

                res = method_url_re_pattern.findall(log_line)
                if not res:
                    print('日誌未匹配到請求URL,已忽略:\n%s' % log_line)
                    continue
                method = res[0][0]
                url = res[0][1].split('?')[0]  # 去掉了 ?及後面的url參數

                # 提取耗時
                res = url_time_taken_extractor.findall(log_line)
                if res:
                    time_taken = float(res[0])
                else:
                    print('未從日誌提取到請求耗時,已忽略日誌:\n%s' % log_line)
                    continue

                # 存儲解析後的日誌信息
                self.log_line_parsed_queue.append({'method': method,
                                                   'url': url,
                                                   'time_taken': time_taken,
                                                   })


    def collect_statistics(self):
        '''收集統計數據'''

        def _collect_statistics():
            while self.log_line_parsed_queue or not self.log_parsing_finished:
                if not self.log_line_parsed_queue:
                    continue
                log_info = self.log_line_parsed_queue.popleft()
                # do something with log_info
       
        with parallel_backend("multiprocessing", n_jobs=1):
            Parallel()(delayed(_collect_statistics)() for i in range(1))

    def run(self, file_path_list):
        # 多線程讀取日誌文件
        for file_path in file_path_list:
            thread = threading.Thread(target=self.read_log_file,
                                      name="read_log_file",
                                      args=(file_path,))
            thread.start()
            self.files_read_list.append(file_path)

        # 啟動日誌解析進程
        thread = threading.Thread(target=self.start_processes_for_log_parsing, name="start_processes_for_log_parsing")
        thread.start()

        # 啟動日誌統計數據收集進程
        thread = threading.Thread(target=self.collect_statistics, name="collect_statistics")
        thread.start()

        start = datetime.now()
        while threading.active_count() > 1:
            print('程式正在努力解析日誌...')
            time.sleep(0.5)

        end = datetime.now()
        print('解析完成', 'start', start, 'end', end, '耗時', end - start)



if __name__ == "__main__":
    log_parser = LogParser()
    log_parser.run(['access.log', 'access2.log'])

註意:

需要合理的配置單次讀取文件數據塊的大小,不能過大,或者過小,否則都可能會導致數據讀取速度變慢。筆者實踐環境下,發現10M~15M每次是一個比較高效的配置。

作者:授客
微信/QQ:1033553122
全國軟體測試QQ交流群:7156436

Git地址:https://gitee.com/ishouke
友情提示:限於時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞後如有任何疑問,請聯繫我!!!
           微信打賞                        支付寶打賞                  全國軟體測試交流QQ群  
              


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • win11特有的快捷鍵 win鍵就是圖案是windows圖標的那個按鍵 | 作用 | 快捷鍵 | | | | | 打開快速設置,win11是展開音量,wifi,藍牙的設置項,win10也可以用 | win + a | | 打開通知中心和日曆,win10無 | win + n | | 打開投屏,win ...
  • RDP,Remote Desktop Protocol,遠程桌面協議,是一個多通道(mutil-channel)的協議,讓用戶(客戶端或稱“本地電腦”)連上提供微軟終端機服務的電腦(伺服器端或稱“遠程電腦”)。大部分的Windows、Linux、FreeBSD、Mac OS X都有相應的客戶端。服務... ...
  • 1. 判斷本地是否已經安裝MySQL ① 在運行界面輸入services.msc進入服務界面,查看是否有MySQL服務 ② 進入任務管理器,點擊服務看是否有MySQL服務 2. 安裝MySQL(壓縮包版) 1. 下載MySQL社區伺服器(ZIP): MySQL zip下載 點擊No thanks,j ...
  • 本文是 CSS Houdini 之 CSS Painting API 系列第四篇。 現代 CSS 之高階圖片漸隱消失術 現代 CSS 高階技巧,像 Canvas 一樣自由繪圖構建樣式! 現代 CSS 高階技巧,完美的波浪進度條效果! 在上三篇中,我們詳細介紹了 CSS Painting API 是如 ...
  • 因為團隊內部開啟了一個持續的前端代碼質量改進計劃,其中一個專項就是TS類型覆蓋率,期間用到了type-coverage這個倉庫,所以借這篇文章分享一下這個工具,並順便從源碼閱讀的角度來分析一下該工具的源碼,我自己fork了一個倉庫,完成了中文版本的ReadMe文件並對核心代碼添加了關鍵註釋,需要的同 ...
  • 如果你被問到:什麼是反射?為什麼需要反射、以及反射的應用?你會如何回答呢? 本篇會帶大家初識反射,瞭解反射概念和基本應用。反射的原理以及深入源碼的探究將會在後面幾篇介紹。 ...
  • 本文已收錄至Github,推薦閱讀 👉 Java隨想錄 微信公眾號:Java隨想錄 CSDN: 碼農BookSea 轉載請在文首註明出處,如發現惡意抄襲/搬運,會動用法律武器維護自己的權益。讓我們一起維護一個良好的技術創作環境! 三色標記演算法 可達性分析演算法理論上要求全過程都基於一個能保障一致性的 ...
  • 本文已收錄至Github,推薦閱讀 👉 Java隨想錄 微信公眾號:Java隨想錄 CSDN: 碼農BookSea 轉載請在文首註明出處,如發現惡意抄襲/搬運,會動用法律武器維護自己的權益。讓我們一起維護一個良好的技術創作環境! 記憶集與卡表 跨區引用問題 跨代引用是指新生代中存在對老年代對象的引 ...
一周排行
    -Advertisement-
    Play Games
  • 基於.NET Framework 4.8 開發的深度學習模型部署測試平臺,提供了YOLO框架的主流系列模型,包括YOLOv8~v9,以及其系列下的Det、Seg、Pose、Obb、Cls等應用場景,同時支持圖像與視頻檢測。模型部署引擎使用的是OpenVINO™、TensorRT、ONNX runti... ...
  • 十年沉澱,重啟開發之路 十年前,我沉浸在開發的海洋中,每日與代碼為伍,與演算法共舞。那時的我,滿懷激情,對技術的追求近乎狂熱。然而,隨著歲月的流逝,生活的忙碌逐漸占據了我的大部分時間,讓我無暇顧及技術的沉澱與積累。 十年間,我經歷了職業生涯的起伏和變遷。從初出茅廬的菜鳥到逐漸嶄露頭角的開發者,我見證了 ...
  • C# 是一種簡單、現代、面向對象和類型安全的編程語言。.NET 是由 Microsoft 創建的開發平臺,平臺包含了語言規範、工具、運行,支持開發各種應用,如Web、移動、桌面等。.NET框架有多個實現,如.NET Framework、.NET Core(及後續的.NET 5+版本),以及社區版本M... ...
  • 前言 本文介紹瞭如何使用三菱提供的MX Component插件實現對三菱PLC軟元件數據的讀寫,記錄了使用電腦模擬,模擬PLC,直至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1. PLC開發編程環境GX Works2,GX Works2下載鏈接 https:// ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • 1、jQuery介紹 jQuery是什麼 jQuery是一個快速、簡潔的JavaScript框架,是繼Prototype之後又一個優秀的JavaScript代碼庫(或JavaScript框架)。jQuery設計的宗旨是“write Less,Do More”,即倡導寫更少的代碼,做更多的事情。它封裝 ...
  • 前言 之前的文章把js引擎(aardio封裝庫) 微軟開源的js引擎(ChakraCore))寫好了,這篇文章整點js代碼來測一下bug。測試網站:https://fanyi.youdao.com/index.html#/ 逆向思路 逆向思路可以看有道翻譯js逆向(MD5加密,AES加密)附完整源碼 ...
  • 引言 現代的操作系統(Windows,Linux,Mac OS)等都可以同時打開多個軟體(任務),這些軟體在我們的感知上是同時運行的,例如我們可以一邊瀏覽網頁,一邊聽音樂。而CPU執行代碼同一時間只能執行一條,但即使我們的電腦是單核CPU也可以同時運行多個任務,如下圖所示,這是因為我們的 CPU 的 ...
  • 掌握使用Python進行文本英文統計的基本方法,並瞭解如何進一步優化和擴展這些方法,以應對更複雜的文本分析任務。 ...
  • 背景 Redis多數據源常見的場景: 分區數據處理:當數據量增長時,單個Redis實例可能無法處理所有的數據。通過使用多個Redis數據源,可以將數據分區存儲在不同的實例中,使得數據處理更加高效。 多租戶應用程式:對於多租戶應用程式,每個租戶可以擁有自己的Redis數據源,以確保數據隔離和安全性。 ...