python爬蟲等獲取實時數據+Flume+Kafka+Spark Streaming+mysql+Echarts實現數據動態實時採集、分析、展示

来源:https://www.cnblogs.com/rainbow-1/archive/2022/03/18/16023419.html
-Advertisement-
Play Games

使用爬蟲等獲取實時數據+Flume+Kafka+Spark Streaming+mysql+Echarts實現數據動態實時採集、分析、展示 主要工作流程如下所示: 其中爬蟲獲取實時數據,並把數據實時傳輸到Linux本地文件夾中。 使用Flume實時監控該文件夾,如果發現文件內容變動則進行處理,將數據 ...


使用爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集、分析、展示

主要工作流程如下所示:

其中爬虫获取实时数据,并把数据实时传输到Linux本地文件夹中。

使用Flume实时监控该文件夹,如果发现文件内容变动则进行处理,将数据抓取并传递到Kafka消息队列中。

之后使用Spark Streaming 实时处理Kafka通道中的数据,并写入本地mysql数据库中,之后读取mysql数据库中的数据并基于Echart图表对数据进行实时动态展示。


一、实时数据的模拟

案例简化了第一步的流程,使用模拟数据进行测试,代码如下:

import datetime
import random
import time

import paramiko

hostname = "hadoop102"
port = 22
username = "root"
password = "000429"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, port, username, password, compress=True)
sftp_client = client.open_sftp()
# try:
#     for line in remote_file:
#         print(line)
# finally:
#     remote_file.close()
#获取系统时间
num1=3000
for i in range(1000):
    remote_file = sftp_client.open("/opt/module/data/test1.csv", 'a')  # 文件路径
    time1 = datetime.datetime.now()
    time1_str = datetime.datetime.strftime(time1, '%Y-%m-%d %H:%M:%S')
    print("当前时间:  " + time1_str)
    time.sleep(random.randint(1,3))
    num1_str=str(num1+random.randint(-1300,1700))
    print("当前随机数:  "+num1_str)
    remote_file.write(time1_str+","+num1_str+"\n")
    remote_file.close()
  • 主要过程
  1. 在/opt/module/data/路径下建立test1.csv文件

  2. 代码实现远程连接虚拟机hadoop102并以root用户身份登录,打开需要上传的文件目录。

  3. 使用一个for循环间隔随机1到3秒向文件中写入一些数据。


二、Flume实时监控文件

  1. 进入/opt/module/flume/job路径编辑配置文件信息(myflume.conf)

    内容如下:其中指定了被监控文件的路径,Kafka服务主机地址,Kafka主题和序列化等信息

#给agent中的三个组件source、sink和channel各起一个别名,a1代表为agent起的别名
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source属性配置信息
a1.sources.r1.type = exec
#a1.sources.r1.bind = localhost
#a1.sources.r1.port = 44444
a1.sources.r1.command=tail -F /opt/module/data/test1.csv

# sink属性配置信息 
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers:hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic=first
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoer

#channel属性配置信息
  #内存模式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
  #传输参数设置
a1.channels.c1.transactionCapacity=100

#绑定source和sink到channel上
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
  1. 在/opt/module/flume 路径下开启Flume,此时Flume开始监控目标文件(job/myflume.conf)
bin/flume-ng agent -c conf/ -n a1 -f job/myflume.conf -Dflume.root.logger=INFO,console

三、开启Kafka并使用Spark Streaming完成数据的接收

  1. 首先需要开启集群的zookeeper服务

  2. 之后开启Kafka服务

  3. 开启Kafka后,新建一个名为first的主题(topic)

    bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
    
  4. 新建Maven项目,编写代码,Kafka的topic主题的消费者

    pom.xml配置如下:注意此处各个资源的版本号一定要与本机(IDEA编译器)的Scala版本一致,博主为Scala 2.12.11

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.reliable.ycw</groupId>
        <artifactId>spark-test</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-network-common -->
            <!--<dependency>-->
                <!--<groupId>org.apache.spark</groupId>-->
                <!--<artifactId>spark-network-common_2.12</artifactId>-->
                <!--<version>3.0.0</version>-->
            <!--</dependency>-->
            <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.18</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.12.11</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-compiler</artifactId>
                <version>2.12.11</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-reflect</artifactId>
                <version>2.12.11</version>
            </dependency>
        </dependencies>
        <build>
        <plugins>
        <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
        <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
        <executions>
        <execution>
            <!-- 声明绑定到 maven 的 compile 阶段 -->
            <goals>
                <goal>testCompile</goal>
            </goals>
        </execution>
        </executions>
        </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
        </build>
    </project>
    

    消费者类代码如下:

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import java.sql.DriverManager
    import java.text.SimpleDateFormat
    import java.util.Date
    
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    /** Utility functions for Spark Streaming examples.*/
    object StreamingExamples extends  App{
      /** Set reasonable logging levels for streaming if the user has not configured log4j.*/
    //  def setStreamingLogLevels() {
    //    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    //    if (!log4jInitialized) {
    //      // We first log Appsomething to initialize Spark's default logging, then we override the
    //      // logging level.
    //      logInfo("Setting log level to [WARN] for streaming example." +
    //        " To override add a custom log4j.properties to the classpath.")
    //      Logger.getRootLogger.setLevel(Level.WARN)
    //    }
    //  }
      val conf=new SparkConf().setMaster("local").setAppName("jm")
        .set("spark.streaming.kafka.MaxRatePerPartition","3")
        .set("spark.local.dir","./tmp")
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //创建上下文,2s为批处理间隔
      val ssc = new StreamingContext(conf,Seconds(5))
    
      //配置kafka参数,根据broker和topic创建连接Kafka 直接连接 direct kafka
      val KafkaParams = Map[String,Object](
        //brokers地址
        "bootstrap.servers"->"hadoop102:9092,hadoop103:9092,hadoop104:9092",
        //序列化类型
        "key.deserializer"->classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "MyGroupId",
        //设置手动提交消费者offset
        "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true
      )
    
      //获取KafkaDStream
      val kafkaDirectStream = KafkaUtils.createDirectStream[String,String](ssc,
        //
        PreferConsistent,Subscribe[String,String](List("first"),KafkaParams))
      kafkaDirectStream.print()
      var num=kafkaDirectStream.count()
    
      var num_1=""
      num foreachRDD (x => {
    //      var res=x.map(line=>line.split(","))
    
         val connection = getCon()
         var time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date).toString
         var sql = "insert into content_num values('" + time + "'," + x.collect()(0) + ")"
         connection.createStatement().execute(sql)
         connection.close()
       })
    //  print("sdfasdf")
    //  print(num_1)
      //根据得到的kafak信息,切分得到用户电话DStream
    //  val nameAddrStream = kafkaDirectStream.map(_.value()).filter(record=>{
    //    val tokens: Array[String] = record.split(",")
    //    tokens(1).toInt==0
    //  })
    //
    //  nameAddrStream.print()
    //  .map(record=>{
    //    val tokens = record.split("\t")
    //    (tokens(0),tokens(1))
    //  })
    //
    //
    //  val namePhoneStream = kafkaDirectStream.map(_.value()).filter(
    //    record=>{
    //      val tokens = record.split("\t")
    //      tokens(2).toInt == 1
    //    }
    //  ).map(record=>{
    //    val tokens = record.split("\t")
    //    (tokens(0),tokens(1))
    //  })
    //
    //  //以用户名为key,将地址电话配对在一起,并产生固定格式的地址电话信息
    //  val nameAddrPhoneStream = nameAddrStream.join(namePhoneStream).map(
    //    record=>{
    //      s"姓名:${record._1},地址:${record._2._1},邮编:${record._2._2}"
    //    }
    //  )
    //  //打印输出
    //  nameAddrPhoneStream.print()
      //开始计算
      ssc.start()
      ssc.awaitTermination()
      def getCon()={
        Class.forName("com.mysql.cj.jdbc.Driver")
        DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8","root","000429")
      }
    }
    
    

    这段代码指定了虚拟机中Kafka的主题信息,并从中定时获取(博主设置的为5秒)期间变化的信息量,完成计算后把本机的时间和信息变化量存储到本地Mysql数据库中

    • 注意指定时区和编码

      jdbc:mysql://localhost:3306/spark?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8
      

四、可视化

使用Echarts平滑折线图完成数据的展示(python flask框架)

  1. 后台读取mysql的数据
import pymysql
def get_conn():
    """
    获取连接和游标
    :return:
    """
    conn=pymysql.connect(host="127.0.0.1",
                         user="root",
                         password="000429",
                         db="spark",
                         charset="utf8")
    cursor=conn.cursor()
    return conn,cursor

def close_conn(conn, cursor):
    """
    关闭连接和游标
    :param conn:
    :param cursor:
    :return:
    """
    if cursor:
        cursor.close()
    if conn:
        conn.close()

#query
def query(sql,*args):
    """
    通用封装查询
    :param sql:
    :param args:
    :return:返回查询结果 ((),())
    """
    conn , cursor= get_conn()
    print(sql)
    cursor.execute(sql)
    res = cursor.fetchall()
    close_conn(conn , cursor)
    return res

def dynamic_bar():
    # 获取数据库连接
    conn, cursor = get_conn()
    if (conn != None):
        print("数据库连接成功!")
    typenumsql = "select * from content_num order by time desc limit 11;"
    detail_sql = ""
    res_title = query(typenumsql)
    type_num = []  # 存储类别+数量
    for item1 in res_title:
        type_num.append(item1)
    return type_num
  1. 路由获取后台数据
#获取 动态 柱状图数据
@app.route('/dynamic_bar')
def dynamic_bar():
    res_list=spark_sql.dynamic_bar()
    my_list=[]
    list_0=[]
    list_1=[]
    for item in res_list:
        list_0.append(item[0])
        list_1.append(item[1])
    my_list.append(list_0)
    my_list.append(list_1)
    return {"data":my_list}
  1. 前台绘制折线图
<!DOCTYPE html>
<html style="height: 100%">
    <head>
        <meta charset="utf-8">
    </head>
    <body style="height: 100%; margin: 0">
        <div id="container" style="height: 100%"></div>
        <script type="text/javascript" src="https://cdn.jsdelivr.net/npm/[email protected]/dist/echarts.min.js"></script>
        <script src="../static/js/jquery-3.3.1.min.js"></script>
    </body>
</html>
<script>
    var dom = document.getElementById("container");
    var myChart = echarts.init(dom);
    var app = {};
    var option;
</script>

<script type="text/javascript">

    option = {
      tooltip: {
        trigger: 'axis',
        axisPointer: {
          type: 'shadow'
        }
      },
      grid: {
        left: '3%',
        right: '4%',
        bottom: '3%',
        containLabel: true
      },
      xAxis: [
        {
          type: 'category',
          data: [],
          axisTick: {
            alignWithLabel: true
          }
        }
      ],
      yAxis: [
        {
          type: 'value'
        }
      ],
      series: [
        {
          name: 'Direct',
          type: 'bar',
          barWidth: '60%',
          data: []
        }
      ]
    };

    if (option && typeof option === 'object') {
        myChart.setOption(option);
    }
        function update(){
        $.ajax({
            url:"/dynamic_bar",
            async:true,
            success:function (data) {
                option.xAxis[0].data=data.data[0]
                option.series[0].data=data.data[1]
                myChart.setOption(option);
            },
            error:function (xhr,type,errorThrown) {
                alert("出现错误!")
            }
        })
    }
    setInterval("update()",100)
</script>

可视化这里需要注意的点:

  • 注意先引入echarts.min.js再引入jquery-3.3.1.min.js
  • 注意指定放置图像的div块的大小
  • 把赋值方法放在图像初始化配置代码的后面
  • 注意设置方法循环执行:setInterval("update()",100)

小结:整个流程的关键在于对实时数据的监控和展示,首先要保证数据传输的动态性,其次要保证Flume实时监控数据的变化。其中使用Kafka的目的在于当数据量足够大的时候,往往会出现数据的监控和采集速度跟不上数据的变化,所以采用Kafka消息队列机制,让其缓冲数据以实现大数据量的处理,后续需要编写Spark Streaming代码完成对消息的收集处理(存入本地mysql数据库),最后读取数据库数据并用折线图完成动态展示效果,数据库的数据是实时变动的,这就需要在读取的时候要读到最新进来的数据,这样才能看到图线的动态效果。(下图的图线会随着数据的变化动态改变!)


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • MySQL8.0 註意事項以及解決方案 1. MySQL8.0 修改大小寫敏感配置 天坑MySQL8.0! 在安裝後, 便無法通過修改配置文件,重啟服務,或者執行sql來更改資料庫配置, 要想配置的話, 必須在MySQL安裝完成後, 進行修改配置文件, 否則需要刪除/var/lib/mysql, 如 ...
  • SQL學習日記-語法篇 1. 常見的資料庫對象 對象名 關鍵字 描述 表 table 存儲數據的邏輯單元,以行和列存在,行是數據記錄,列是(屬性)欄位 系統表(數據字典) 存放資料庫相關信息的表 程式員只可查看,不可修改 約束 constraint 執行數據校驗和保證數據完整性的規則 視圖 view ...
  • 推薦用 MySQL 8.0 (2018/4/19 發佈, 開發者說同比 5.7 快 2 倍) 或同類型以上版本. CREATE TABLE TEMPLATE CREATE TABLE [table_name] ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT ...
  • 背景 位元組跳動開發套件數據集成團隊(DTS ,Data Transmission Service)在位元組跳動內基於 Flink 實現了流批一體的數據集成服務。其中一個典型場景是 Kafka/ByteMQ/RocketMQ → HDFS/Hive 。Kafka/ByteMQ/RocketMQ → HD ...
  • 2022.3.20 如何本地連接虛擬機安裝的linux 的mysql 1防火牆開啟開啟 1.1、開啟埠3306 firewall-cmd --zone=public --add-port=3306/tcp --permanent 1.2、重啟防火牆 firewall-cmd --reload 1. ...
  • 變數、流程式控制制和游標 變數 在MySQL資料庫的存儲過程和函數中,可以使用變數來存儲查詢或計算的中間結果數據,或者輸出最終的結果的數據 系統變數 變數由系統定義,屬於伺服器層面 系統變數的分類 每一個MySQL客戶機成功連接伺服器後,都會產生與之對應的會話(建立一次連接相當於一次會話)。MySQL服 ...
  • MySQL 索引(入門): 一、介紹 1.什麼是索引? 一般的應用系統,讀寫比例在10:1左右,而且插入操作和一般的更新操作很少出現性能問題,在生產環境中,我們遇到最多的,也是最容易出問題的,還是一些複雜的查詢操作,因此對查詢語句的優化顯然是重中之重。說起加速查詢,就不得不提到索引了。 2.為什麼要 ...
  • 博客推行版本更新,成果積累制度,已經寫過的博客還會再次更新,不斷地琢磨,高質量高數量都是要追求的,工匠精神是學習必不可少的精神。因此,大家有何建議歡迎在評論區踴躍發言,你們的支持是我最大的動力,你們敢投,我就敢肝 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...