Flink實例-Wordcount詳細步驟

来源:https://www.cnblogs.com/ALittleMoreLove/archive/2018/08/09/9449992.html
-Advertisement-
Play Games

link實例之Wordcount詳細步驟 1.我的IDE是IntelliJ IDEA.在官網上https://www.jetbrains.com/idea/下載最新版2018.2的IDEA,如下圖。破解可以再http://idea.lanyus.com/上獲取破解碼進行破解,如下圖。 2.當IDE準 ...


link實例之Wordcount詳細步驟

1.我的IDE是IntelliJ IDEA.在官網上https://www.jetbrains.com/idea/下載最新版2018.2的IDEA,如下圖。破解可以再http://idea.lanyus.com/上獲取破解碼進行破解,如下圖。

 

2.當IDE準備就緒後,開始創建一個項目名為bbb的maven項目,如下圖。

 

3.在新視窗打開bbb項目時,IDEA會提示我們是否自動導包。選擇自動導包,如下圖。

 

 4.對pom.xml配置文件進行修改,如下代碼。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xiao</groupId>
    <artifactId>bbb</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>

    </dependencies>

</project>

5.在src/main/java/目錄下新建一個類,我的類名為WordCount,如下代碼。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class WordCount {

    public static void main(String[] args) throws Exception {
        //定義socket的埠號
        int port;
        try{
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("沒有指定port參數,使用預設值9000");
            port = 9000;
        }

        //獲取運行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //連接socket獲取輸入的數據
        DataStreamSource<String> text = env.socketTextStream("10.192.12.106", port, "\n");

        //計算數據
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word:splits) {
                    out.collect(new WordWithCount(word,1L));
                }
            }
        })//打平操作,把每行的單詞轉為<word,count>類型的數據
                .keyBy("word")//針對相同的word數據進行分組
                .timeWindow(Time.seconds(2),Time.seconds(1))//指定計算數據的視窗大小和滑動視窗大小
                .sum("count");
               
        //把數據列印到控制台
        windowCount.print()
                .setParallelism(1);//使用一個並行度
        //註意:因為flink是懶載入的,所以必須調用execute方法,上面的代碼才會執行
        env.execute("streaming word count");

    }

    /**
     * 主要為了存儲單詞以及單詞出現的次數
     */
    public static class WordWithCount{
        public String word;
        public long count;
        public WordWithCount(){}
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }


}

6.開啟IP為10.192.12.106的虛擬機,並開啟該虛擬機的終端,在終端輸入如下命令,該命令可以打開一個埠號為9000的監聽,輸入命令後游標會停留在如下圖的地方。

nc -l 9000

7.切換回IDEA,在菜單欄Build->Build Project,然後運行該類,當控制台console輸出如下圖所示的信息時表示Wordcount成功的與9000的監聽埠建立了連接。

 

8.在虛擬機終端開的游標停留出,輸入hello hello world world world world,然後 回車。在IDEA的控制台會顯示如下單詞和詞頻的信息,表示成功。

 9.接下來把項目bbb打jar包,上傳Flink後臺運行,進行如下圖操作。

首先要保證Java Compiler版本為1.8。

 

然後選擇File->Project Structure,進行修改。

 

10.在配置好Flink的虛擬機下,進入目錄/opt/data/flink-1.3.2/bin中,輸入如下命令,開啟Flink的本地模式。(不會配置flink的小伙伴可以打開鏈接https://www.cnblogs.com/ALittleMoreLove/p/9396118.html

./start-local.sh

11.在瀏覽器里輸入開啟Flink守護進程的虛擬機的IP和8081埠,進入如下Flink前端頁面。

 12.上傳bbb.jar文件到Flink後端運行。

備註:在學習大數據的漫長道路上,我們會遇到各種各樣奇怪的問題,在嘗試了多種方法仍然無法解決後 如果再沒有高人指點,經常一個問題就卡好幾天。這種無奈與絕望的感覺我想各位自學大數據的小伙伴們應該深有體會。我個人解決問題通常有兩種方法:一種是直接找大牛幫忙,另外一種是在網上找各種相關的博客和帖子,再從中總結出一套可以解決自己問題的方法。自己探索新知識時,往往是很艱辛的,遇到好多天也解決不了的問題也是很正常的,但是千萬不要放棄,堅持下來就一定會有收穫的!Wordcount實例令我躺了兩天的坑,最後終於找到瞭解決的方法,希望這篇隨筆可以對自學大數據的小伙伴提供一定的幫助。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一.鏡像下載: 根據需求下載自己需要的版本 從官網下載kali 2018.2 的安裝包:https://www.kali.org/downloads/ 二.燒錄: 這裡推薦用 win32 disk imager 嘗試了好幾款燒錄工具,這個是相對最穩定的 打開win32後 第一步 選中鏡像位置 ,第二 ...
  • 1.實驗目的 mysql伺服器作為生產環境中使用最廣泛的資料庫軟體,以其開源性,穩定性而廣泛使用,但同時由於數據存儲,讀寫頻率高,極易造成資料庫出錯,從而給企業造成不可輓回的損失,我們除了做好資料庫的備份工作外,同時,還應該解決資料庫伺服器的單點故障問題。 2.搭建環境 兩台 mysql 伺服器 1 ...
  • Windows 10包含各種通用應用程式,並且沒有簡單的方法可以將它們隱藏在新的“開始”菜單中的“所有應用程式”視圖中。您可以卸載它們,但Microsoft不允許您以通常的方式輕鬆卸載它們。 Windows 10包含各種通用應用程式,並且沒有簡單的方法可以將它們隱藏在新的“開始”菜單中的“所有應用程 ...
  • 悲觀鎖(Pessimistic Lock) 顧名思義,就是很悲觀,每次去拿數據的時候都認為別人會修改,所以每次在拿數據的時候都會上鎖,這樣別人想拿這個數據就會block直到它拿到鎖。傳統的關係型資料庫裡邊就用到了很多這種鎖機制,比如行鎖,表鎖等,讀鎖,寫鎖等,都是在做操作之前先上鎖。 樂觀鎖(Opt ...
  • 占座 ...
  • 一、聯結表 數據仍使用前文中的數據。 1、子查詢 作為子查詢的SELECT語句只能查詢單個列。企圖檢索多個列將返回錯誤。 2、創建鏈接 SELECT <select_list> FROM A LEFT JOIN B ON A.key = B.key UNION SELECT <select_list ...
  • MySQL的主從複製 部署環境: MySQL master 192.168.40.21 MySQL slave 192.168.40.22 思路: 當主MySQL上進行數據上的操作或者變化時,主MySQL上的二進位日誌文件(binary log)會隨之產生變化,這時從MySQL上開啟I/O線程和sq ...
  • 從這篇開始,講innodb存儲引擎中,對於幾個重要的伺服器參數配置。這些參數以innodb_xx 開頭。 1. innodb_buffer_pool_size的設置 這個參數定義了innodb存儲引擎的表數據和索引數據的最大記憶體緩衝區大小,和myisam不同,myisam的key_buffer_si ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...