利用mapWithState實現按照首字母統計的有狀態的wordCount

来源:https://www.cnblogs.com/icecola/archive/2019/07/07/11145957.html
-Advertisement-
Play Games

利用mapWithState運算元實現有狀態的wordCount,且按照word的第一個字母為key,但是要求輸出的格式為(word,1)這樣形式的結果 ...


最近在做sparkstreaming整合kafka的時候遇到了一個問題:

可以抽象成這樣一個問題:有狀態的wordCount,且按照word的第一個字母為key,但是要求輸出的格式為(word,1)這樣的形式

舉例來說:

例如第一批數據為: hello how when hello

則要求輸出為:(hello,1) (how,2) (when,1) (hello,3)

第二批數據為: hello how when what hi

則要求輸出為: (hello,4) (how,5) (when,2) (what,3) (hi,6)

首先瞭解一下mapWithState的常規用法:

ref: https://www.jianshu.com/p/a54b142067e5

http://sharkdtu.com/posts/spark-streaming-state.html

稍微總結一下mapWithState的幾個tips:

  1. mapWithState是1.6版本之後推出的
  2. 必須設置checkpoint來儲存歷史數據
  3. mapWithState和updateStateByKey的區別 : 他們類似,都是有狀態DStream操作, 區別在於,updateStateByKey是輸出增量數據,隨著時間的增加, 輸出的數據越來越多,這樣會影響計算的效率, 對CPU和記憶體壓力較大.而mapWithState則輸出本批次數據,但是也含有狀態更新.
  4. checkpoint的數據會分散存儲在不同的分區中, 在進行狀態更新時, 首先會對當前 key 做 hash , 再到對應的分區中去更新狀態 , 這種方式大大提高了效率.

解決問題的思路:

State中保存狀態為(String,Int) 元組類型, 其中String為word的全量, 而Int為word的計數.

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.MapWithStateDStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

object MapWithStateApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("MapWithStateApp")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("C:\\Users\\hylz\\Desktop\\checkpoint")
    val lines = ssc.socketTextStream("192.168.100.11",8888)
    val words = lines.flatMap(_.split(" "))

    def mappingFunc(key: String, value: Option[(String, Int)], state: State[(String, Int)]): (String, Int) = {
      val cnt: Int = value.getOrElse((null, 0))._2 + state.getOption.getOrElse((null, 0))._2
      val allField: String = value.getOrElse((null, 0))._1
      state.update((allField, cnt))
      (allField, cnt)
    }

    val cnt: MapWithStateDStream[String, (String, Int), (String, Int), (String, Int)] = words.map(x => (x.substring(0, 1), (x, 1))).mapWithState(StateSpec.function(mappingFunc _))

    cnt.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

測試結果如下

input: hello how when hello

input: hello how when what hi


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • mysql 表的完整性約束 [TOC] 約束概念 unsigned 設置某一個數字無符號 (整數類型 ,浮點類型不能是unsigned的) not null 某一個欄位不能為空(嚴格模式會影響非空設置的效果) default 給某個欄位設置預設值(設置預設值) unique 設置某一個欄位不能重覆 ...
  • mysql支持的數據類型 [TOC] 數據類型官方文檔 數值類型 數值類型表 | 類型 | 大小 | 範圍(有符號) | 範圍(無符號)unsigned約束 | 用途 | | | | | | | | TINYINT | 1位元組 | ( 128,127) | (0,255) | 小整數值 | | SM ...
  • 1.前言 記得從上大學的時候就已經聽說過這個詞語 以前的理解可能就是數據量超大很多嘛 到這幾年大數據這個詞語被人們談論得也越來越頻繁 也越來越想瞭解它的所以自己才會去學習 我覺得做任何事之前肯定有某種驅使你去瞭解它的過程 以下僅是自己個人的理解 2.什麼是大數據? 大數據不僅是數據量大 (G,TB, ...
  • 1.Redis伺服器 can not get resource from pool. 1000個線程併發還能跑,5000個線程的時候出現這種問題,查後臺debug日誌,發現redis 線程池不夠。剛開始設置的是: 順便也改了一下jdbc 的連接池參數,最大空閑和最大連接數都改成1000.在測一下。可 ...
  • SQL中的連接 關係型資料庫的核心之一就是連接, 而在不同的標準中, 連接的寫法上可能有區別, 最為主要的兩個SQL標準就是SQL92和SQL99了, 後面的數字表示的是標準提出的時間. SQL92中的連接 案例使用的表是球員表, 球隊表和身高級別表, 下載: 笛卡爾積 笛卡爾積是一個數學運算, 假 ...
  • 子查詢 子查詢就是嵌套在查詢中的查詢, 目的是為了進行更複雜的查詢, 同時可以理解查詢的過程. 子查詢也分為兩種, 一種是關聯子查詢, 一種是非關聯子查詢. 關聯子查詢與非關聯子查詢 子查詢的劃分是依據了子查詢是否執行多次來進行劃分的. 子查詢從數據表中查詢數據結果, 如果這個數據結果只執行一次, ...
  • 1.MySQL資料庫安裝與配置 1.1 資料庫安裝和配置 安裝需要註意的地方: 典型安裝:安裝最常用的特性組件,會預設安裝至C盤目錄下,適合大部分開發者。 自定義安裝:可以自定義安裝目錄,自定義選擇安裝所需要的組件,安裝過程可控。 完全安裝:會安裝MySQL所有服務及特性,占用磁碟空間大。 配置需要 ...
  • 《SQL Server溫故系列》之增刪改查,CRUD,查詢語句,SELECT。顧名思義,SELECT 語句的作用就是從表中查詢數據。查詢語句一次可以從一個或多個表中檢索一個或多個欄位的一行或多行。SELECT 是 SQL 中最常用的一個語句,完整的 SELECT 語法是非常複雜的,本文將先對簡單查詢... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...