Spark學習之spark-shell

来源:https://www.cnblogs.com/schoolbag/archive/2018/09/13/9635615.html
-Advertisement-
Play Games

前言:要學習spark程式開發,建議先學習spark-shell互動式學習,加深對spark程式開發的理解。spark-shell提供了一種學習API的簡單方式,以及一個能夠進行互動式分析數據的強大工具,可以使用scala編寫(scala運行與Java虛擬機可以使用現有的Java庫)或使用Pytho ...


前言:要學習spark程式開發,建議先學習spark-shell互動式學習,加深對spark程式開發的理解。spark-shell提供了一種學習API的簡單方式,以及一個能夠進行互動式分析數據的強大工具,可以使用scala編寫(scala運行與Java虛擬機可以使用現有的Java庫)或使用Python編寫。

1.啟動spark-shell

    spark-shell的本質是在後臺調用了spark-submit腳本來啟動應用程式的,在spark-shell中已經創建了一個名為sc的SparkContext對象,在4個CPU核運行spark-shell命令如下:

spark-shell --master local[4]

    如果指定Jar包路徑,則命令如下:

spark-shell --master local[4] --jars xxx.jar,yyy,jar

    --master用來設置context將要連接並使用的資源主節點,master的值是standalone模式中spark的集群地址、yarn或mesos集群的URL,或是一個local地址

    --jars可以添加需要用到的jar包,通過逗號分隔來添加多個包。

2.載入text文件

    spark創建sc後,可以載入本地文件創建RDD,這裡測試是載入spark自帶的本地文件README.md,返回一個MapPartitionsRDD文件。

scala> val textFile = sc.textFile("file:///opt/cloud/spark-2.1.1-bin-hadoop2.7/README.md");
textFile: org.apache.spark.rdd.RDD[String] = file:///opt/cloud/spark-2.1.1-bin-hadoop2.7/README.md MapPartitionsRDD[9] at textFile at <console>:24

    載入HDFS文件和本地文件都是使用textFile,區別是添加首碼(hdfs://和file://)進行標識,從本地讀取文件直接返回MapPartitionsRDD,而從HDFS讀取的文件是先轉成HadoopRDD,然後隱試轉換成MapPartitionsRDD。想瞭解MapPartitions可以看這篇MapPartition和Map的區別

3.簡單RDD操作

    對於RDD可以執行Transformation返回新的RDD,也可以執行Action得到返回結果。first命令返迴文件第一行,count命令返迴文件所有行數。

scala> textFile.first();
res6: String = # Apache Spark

scala> textFile.count();
res7: Long = 104

 接下來進行transformation操作,使用filter命令從README.md文件中抽取出一個子集,返回一個新的FilteredRDD。

scala> val textFilter = textFile.filter(line=>line.contains("Spark"));
textFilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at filter at <console>:26

 鏈接多個Transformation和Action,計算包括"Spark"字元串的行數。

scala> textFile.filter(line=>line.contains("Spark")).count();
res10: Long = 20

4.RDD應用的簡單操作

 (1)計算文本中單詞最多的一行的單詞數

scala> textFile.map(line =>line.split(" ").size).reduce((a,b) => if (a > b) a else b);
res11: Int = 22

 先將每一行的單詞使用空格進行拆分,並統計每一行的單詞數,創建一個基於單詞數的新RDD,然後對該RDD進行Reduce操作返回最大值。

 (2)統計單詞

 詞頻統計WordCount是大數據處理最流行的入門程式之一,Spark可以很容易實現WordCount操作。

//這個過程返回的是一個(string,int)類型的鍵值對ShuffledRDD(y執行reduceByKey的時候需要進行Shuffle操作,返回的是一個Shuffle形式的RDD),最後用Collect聚合統計結果
scala> val wordCount = textFile.flatMap(line =>line.split(" ")).map(x => (x,1)).reduceByKey((a,b) => a+b); wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:26 scala> wordCount.collect [Stage 7:> (0 + 0)
[Stage 7:> (0 + 2)
res12: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (general,3), (have,1), (pre-built,1), (YARN,,1), ([http://spark.apache.org/developer-tools.html](the,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (locally.,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (info,1), (["Specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (["Parallel,1), (ar...
//這裡使用了占位符_,使表達式更為簡潔,是Scala語音的特色,每個_代表一個參數。
scala> val wordCount2 = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_); wordCount2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at reduceByKey at <console>:26 scala> wordCount2.collect res14: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (general,3), (have,1), (pre-built,1), (YARN,,1), ([http://spark.apache.org/developer-tools.html](the,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (locally.,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (info,1), (["Specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (["Parallel,1), (ar...
//Spark預設不進行排序,如有需要排序輸出,排序的時候將key和value互換,使用sortByKey方法指定升序(true)和降序(false)
scala> val wordCount3 = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)); wordCount3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at map at <console>:26 scala> wordCount3.collect res15: Array[(String, Int)] = Array(("",71), (the,24), (to,17), (Spark,16), (for,12), (##,9), (and,9), (a,8), (can,7), (run,7), (on,7), (is,6), (in,6), (using,5), (of,5), (build,4), (Please,4), (with,4), (also,4), (if,4), (including,4), (an,4), (You,4), (you,4), (general,3), (documentation,3), (example,3), (how,3), (one,3), (For,3), (use,3), (or,3), (see,3), (Hadoop,3), (Python,2), (locally,2), (This,2), (Hive,2), (SparkPi,2), (refer,2), (Interactive,2), (Scala,2), (detailed,2), (return,2), (Shell,2), (class,2), (Python,,2), (set,2), (building,2), (SQL,2), (guidance,2), (cluster,2), (shell:,2), (supports,2), (particular,2), (following,2), (which,2), (should,2), (To,2), (be,2), (do,2), (./bin/run-example,2), (It,2), (1000:,2), (tests,2), (examples,2), (at,2), (`examples`,2), (that,2), (H...

5.RDD緩存使用RDD的cache()方法

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Sentry 是一款基於 Django實現的錯誤日誌收集和聚合的平臺,它是 Python 實現的,但是其日誌監控功能卻不局限於python,對諸如 Node.js, php,ruby, C#,java 等語言的項目都可以做到無縫集成,甚至可以用來對iOS, Android 移動客戶端以及 Web前端 ...
  • Ubuntu關閉網路服務命令: service network-manager stop 重啟網路服務命令: service network-manager restart ...
  • elk 為 elasticsearch(查詢搜索引擎),logstash(對日誌進行分析和過濾,然後轉發給elasticsearch),kibana(一個web圖形界面用於可視化elasticsearch數據)縮寫 1.安裝docker環境 2. 準備鏡像 拉取 elk鏡像,我們用sebp/elk ...
  • import matplotlib.pyplot as pltx = [4, 9, 21, 55, 30, 18]labels = ['math', 'history', 'chemistry', 'physics', 'biology','Enrlish']explode = [0, 0.01, ...
  • 背景 隨著數據的積累,MongoDB中的數據量越來越大,數據分析團隊從資料庫中抽取變化數據(假如依據欄位createdatetime,transdatetime),越來越困難。我們知道MongoDB的副本集有一個數據結構Oplog,裡面存儲了Primary節點的所有寫操作(此處的寫操作是指查詢以外的 ...
  • 1.下載RPM包 選擇【Red Hat & Oracle Enterprise Linux】平臺,下載客戶端和伺服器端的RPM包 2.解壓 3.切換root用戶 4.安裝server 5.5 5.更改密碼 6.查看舊版本MySQL的命令 7.卸載舊版本 8.啟動服務 9.安裝客戶端 10.安裝成功後 ...
  • 一.概述 慢查詢日誌記錄了所有的超過sql語句( 超時參數long_query_time單位 秒),獲得表鎖定的時間不算作執行時間。慢日誌預設寫入到參數datadir(數據目錄)指定的路徑下。預設文件名是[hostname]_slow.log,預設超時是10秒,預設不開啟慢查詢日誌。下麵查看慢日誌的 ...
  • 1、檢查是否正確的啟動了resourcemanager服務 若是沒有啟動,請檢查yarn-site-xml配置 2、若是啟動了 1、檢查客戶機和虛擬機之間是否能夠相互ping通 2、檢查虛擬機防火牆是否關閉 3、排查 1、首先,各個虛擬機和客戶端之前都能ping通,虛擬機都能上外網,說明網路沒有問題 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...