Flink的快速入門(一)

来源:https://www.cnblogs.com/frankdeng/archive/2018/05/10/9020139.html
-Advertisement-
Play Games

1. Flink的引入 這幾年大數據的飛速發展,出現了很多熱門的開源社區,其中著名的有 Hadoop、Storm,以及後來的 Spark,他們都有著各自專註的應用場景。Spark 掀開了記憶體計算的先河,也以記憶體為賭註,贏得了記憶體計算的飛速發展。Spark 的火熱或多或少的掩蓋了其他分散式計算的系統身 ...


1. Flink的引入

這幾年大數據的飛速發展,出現了很多熱門的開源社區,其中著名的有 HadoopStorm,以及後來的 Spark,他們都有著各自專註的應用場景。Spark 掀開了記憶體計算的先河,也以記憶體為賭註,贏得了記憶體計算的飛速發展。Spark 的火熱或多或少的掩蓋了其他分散式計算的系統身影。就像 Flink,也就在這個時候默默的發展著。

在國外一些社區,有很多人將大數據的計算引擎分成了 4 代,當然,也有很多人不會認同。我們先姑且這麼認為和討論。

首先第一代的計算引擎,無疑就是 Hadoop 承載的 MapReduce。這裡大家應該都不會對 MapReduce 陌生,它將計算分為兩個階段,分別為 Map Reduce。對於上層應用來說,就不得不想方設法去拆分演算法,甚至於不得不在上層應用實現多個 Job 的串聯,以完成一個完整的演算法,例如迭代計算。

由於這樣的弊端,催生了支持 DAG 框架的產生。因此,支持 DAG 的框架被劃分為第二代計算引擎。如 Tez 以及更上層的 Oozie。這裡我們不去細究各種 DAG 實現之間的區別,不過對於當時的 Tez Oozie 來說,大多還是批處理的任務。

接下來就是以 Spark 為代表的第三代的計算引擎。第三代計算引擎的特點主要是 Job 內部的 DAG 支持(不跨越 Job),以及強調的實時計算。在這裡,很多人也會認為第三代計算引擎也能夠很好的運行批處理的 Job

隨著第三代計算引擎的出現,促進了上層應用快速發展,例如各種迭代計算的性能以及對流計算和 SQL 等的支持。Flink 的誕生就被歸在了第四代。這應該主要表現在 Flink 對流計算的支持,以及更一步的實時性上面。當然 Flink 也可以支持 Batch 的任務,以及 DAG 的運算。

首先,我們可以通過下麵的性能測試初步瞭解兩個框架的性能區別,它們都可以基於記憶體計算框架進行實時計算,所以都擁有非常好的計算性能。經過測試,Flink計算性能上略好。 

 

測試環境: 

1.CPU7000個; 

2.記憶體:單機128GB 

3.版本:Hadoop 2.3.0Spark 1.4Flink 0.9 

4.數據:800MB8GB8TB 

5.演算法:K-means:以空間中K個點為中心進行聚類,對最靠近它們的對象歸類。通過迭代的方法,逐次更新各聚類中心的值,直至得到最好的聚類結果。 

6.迭代:K=103組數據 

 

迭代次數(縱坐標是秒,橫坐標是次數)

SparkFlink全部都運行在Hadoop YARN上,性能為Flink > Spark > Hadoop(MR),迭代次數越多越明顯,性能上,Flink優於SparkHadoop最主要的原因是Flink支持增量迭代,具有對迭代自動優化的功能。 

2. Flink簡介

很多人可能都是在 2015 年才聽到 Flink 這個詞,其實早在 2008 年,Flink 的前身已經是柏林理工大學一個研究性項目, 在 2014 Apache 孵化器所接受,然後迅速地成為了 ASFApache Software Foundation)的頂級項目之一。Flink 的最新版本目前已經更新到了 0.10.0 了,在很多人感慨 Spark 的快速發展的同時,或許我們也該為 Flink 的發展速度點個贊。

Flink 是一個針對流數據和批數據的分散式處理引擎。它主要是由 Java 代碼實現。目前主要還是依靠開源社區的貢獻而發展。對 Flink 而言,其所要處理的主要場景就是流數據,批數據只是流數據的一個極限特例而已。再換句話說,Flink 會把所有任務當成流來處理,這也是其最大的特點。

Flink 可以支持本地的快速迭代,以及一些環形的迭代任務。並且 Flink 可以定製化記憶體管理。在這點,如果要對比 Flink Spark 的話,Flink 並沒有將記憶體完全交給應用層。這也是為什麼 Spark 相對於 Flink,更容易出現 OOM 的原因(out of memory)。就框架本身與應用場景來說,Flink 更相似與 Storm。如果之前瞭解過 Storm 或者 Flume 的讀者,可能會更容易理解 Flink 的架構和很多概念。下麵讓我們先來看下 Flink 的架構圖。

 

我們可以瞭解到 Flink 幾個最基礎的概念,ClientJobManager TaskManagerClient 用來提交任務給 JobManagerJobManager 分發任務給 TaskManager 去執行,然後 TaskManager 會心跳的彙報任務狀態。看到這裡,有的人應該已經有種回到 Hadoop 一代的錯覺。確實,從架構圖去看,JobManager 很像當年的 JobTrackerTaskManager 也很像當年的 TaskTracker。然而有一個最重要的區別就是 TaskManager 之間是是流(Stream)。其次,Hadoop 一代中,只有 Map Reduce 之間的 Shuffle,而對 Flink 而言,可能是很多級,並且在 TaskManager 內部和 TaskManager 之間都會有數據傳遞,而不像 Hadoop,是固定的 Map Reduce

3. 技術的特點(可選)

關於Flink所支持的特性,我這裡只是通過分類的方式簡單做一下梳理,涉及到具體的一些概念及其原理會在後面的部分做詳細說明。

3.1. 流處理特性

支持高吞吐、低延遲、高性能的流處理

支持帶有事件時間的視窗(Window)操作

支持有狀態計算的Exactly-once語義

支持高度靈活的視窗(Window)操作,支持基於timecountsession,以及data-driven的視窗操作

支持具有Backpressure功能的持續流模型

支持基於輕量級分散式快照(Snapshot)實現的容錯

一個運行時同時支持Batch on Streaming處理和Streaming處理

FlinkJVM內部實現了自己的記憶體管理

支持迭代計算

支持程式自動優化:避免特定情況下Shuffle、排序等昂貴操作,中間結果有必要進行緩存

 

3.2. API支持

Streaming數據類應用,提供DataStream API

對批處理類應用,提供DataSet API(支持Java/Scala

3.3. Libraries支持

支持機器學習(FlinkML

支持圖分析(Gelly

支持關係數據處理(Table

支持複雜事件處理(CEP

3.4. 整合支持

支持Flink on YARN

支持HDFS

支持來自Kafka的輸入數據

支持Apache HBase

支持Hadoop程式

支持Tachyon

支持ElasticSearch

支持RabbitMQ

支持Apache Storm

支持S3

支持XtreemFS

3.5. Flink生態圈

一個計算框架要有長遠的發展,必須打造一個完整的 Stack。不然就跟紙上談兵一樣,沒有任何意義。只有上層有了具體的應用,並能很好的發揮計算框架本身的優勢,那麼這個計算框架才能吸引更多的資源,才會更快的進步。所以 Flink 也在努力構建自己的 Stack

Flink 首先支持了 Scala Java APIPython 也正在測試中。Flink 通過 Gelly 支持了圖操作,還有機器學習的 FlinkMLTable 是一種介面化的 SQL 支持,也就是 API 支持,而不是文本化的 SQL 解析和執行。對於完整的 Stack 我們可以參考下圖。

 

Flink 為了更廣泛的支持大數據的生態圈,其下也實現了很多 Connector 的子項目。最熟悉的,當然就是與 Hadoop HDFS 集成。其次,Flink 也宣佈支持了 TachyonS3 以及 MapRFS。不過對於 Tachyon 以及 S3 的支持,都是通過 Hadoop HDFS 這層包裝實現的,也就是說要使用 Tachyon S3,就必須有 Hadoop,而且要更改 Hadoop 的配置(core-site.xml)。如果瀏覽 Flink 的代碼目錄,我們就會看到更多 Connector 項目,例如 Flume Kafka

4. 安裝

Flink 有三種部署模式,分別是 LocalStandalone Cluster Yarn Cluster

4.1. Local模式

對於 Local 模式來說,JobManager TaskManager 會公用一個 JVM 來完成 Workload。如果要驗證一個簡單的應用,Local 模式是最方便的。實際應用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是將安裝包解壓啟動(./bin/start-local.sh)即可,在這裡不在演示。

4.2. Standalone 模式

4.2.1. 下載

安裝包下載地址:http://flink.apache.org/downloads.html

快速入門教程地址:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html

 

 

 

4.2.2. 上傳安裝包到linux系統

使用rz命令

4.2.3. 解壓

tar –zxvf flink-1.3.2-bin-hadoop26-scala_2.10.tgz

4.2.4. 重命名

mv flink-1.3.2 flink

4.2.5. 修改環境變數

切換到root用戶配置
export FLINK_HOME=/home/hadoop/flink
export PATH=$PATH:$FLINK_HOME/bin
配置結束後切換會普通用戶
source /etc/profile

4.2.6. 修改配置文件

修改flink/conf/masters
master1:8081
修改flink/conf/slaves
master1ha
master2
master2ha
修改flink/conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 2
jobmanager.rpc.address: master1

4.2.7. 啟動flink

/home/Hadoop/flink/bin/start-cluster.sh

 

4.2.8. Flink Rest API

Flink 和其他大多開源的框架一樣,提供了很多有用的 Rest API。不過 Flink RestAPI,目前還不是很強大,只能支持一些 Monitor 的功能。Flink Dashboard 本身也是通過其 Rest 來查詢各項的結果數據。在 Flink RestAPI 基礎上,可以比較容易的將 Flink Monitor 功能和其他第三方工具相集成,這也是其設計的初衷。

Flink 的進程中,是由 JobManager 來提供 Rest API 的服務。因此在調用 Rest 之前,要確定 JobManager 是否處於正常的狀態。正常情況下,在發送一個 Rest 請求給 JobManager 之後,Client 就會收到一個 JSON 格式的返回結果。由於目前 Rest 提供的功能還不多,需要增強這塊功能的讀者可以在子項目 flink-runtime-web 中找到對應的代碼。其中最關鍵一個類 WebRuntimeMonitor,就是用來對所有的 Rest 請求做分流的,如果需要添加一個新類型的請求,就需要在這裡增加對應的處理代碼。下麵我例舉幾個常用 Rest API

1.查詢 Flink 集群的基本信息: /overview。示例命令行格式以及返回結果如下:

$ curl http://localhost:8081/overview
{"taskmanagers":1,"slots-total":16,
"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}

2.查詢當前 Flink 集群中的 Job 信息:/jobs。示例命令行格式以及返回結果如下:

$ curl http://localhost:8081/jobs
{"jobs-running":[],"jobs-finished":
["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}

3.查詢一個指定的 Job 信息: /jobs/jobid。這個查詢的結果會返回特別多的詳細的內容,這是我在瀏覽器中進行的測試,如下圖:

想要瞭解更多 Rest 請求內容的讀者,可以去 Apache Flink 的頁面中查找。

4.2.9. 運行測試任務

./bin/flink run -m master1:8082 ./examples/batch/WordCount.jar --input hdfs://master1:9000/words.txt --output hdfs://master1:9000/clinkout

4.3. Flink HA

首先,我們需要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對於 Standalone 來說,Flink 必須依賴於 Zookeeper 來實現 JobManager HAZookeeper 已經成為了大部分開源框架 HA 必不可少的模塊)。在 Zookeeper 的幫助下,一個 Standalone Flink 集群會同時有多個活著的 JobManager,其中只有一個處於工作狀態,其他處於 Standby 狀態。當工作中的 JobManager 失去連接後(如宕機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 集群。

對於 Yarn Cluaster 模式來說,Flink 就要依靠 Yarn 本身來對 JobManager HA 了。其實這裡完全是 Yarn 的機制。對於 Yarn Cluster 模式來說,JobManager TaskManager 都是被 Yarn 啟動在 Yarn Container 中。此時的 JobManager,其實應該稱之為 Flink Application Master。也就說它的故障恢復,就完全依靠著 Yarn 中的 ResourceManager(和 MapReduce AppMaster 一樣)。由於完全依賴了 Yarn,因此不同版本的 Yarn 可能會有細微的差異。這裡不再做深究。

4.3.1. 修改配置文件

修改flink-conf.yaml

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://master1:9000/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://master1:9000/flink/ha/
high-availability.zookeeper.quorum: master1ha:2181,master2:2181,master2ha:2181
high-availability.zookeeper.client.acl: open 

修改conf

server.1=master1ha:2888:3888
server.2=master2:2888:3888
server.3=master2ha:2888:3888

修改masters

master1:8082
master1ha:8082

修改slaves

master1ha
master2
master2ha

4.3.2. 啟動

/home/Hadoop/flink/bin/start-cluster.sh

 

4.4. Yarn Cluster 模式

4.4.1. 引入

在一個企業中,為了最大化的利用集群資源,一般都會在一個集群中同時運行多種類型的 Workload。因此 Flink 也支持在 Yarn 上面運行。首先,讓我們通過下圖瞭解下 Yarn Flink 的關係。

在圖中可以看出,Flink Yarn 的關係與 MapReduce Yarn 的關係是一樣的。Flink 通過 Yarn 的介面實現了自己的 App Master。當在 Yarn 中部署了 FlinkYarn 就會用自己的 Container 來啟動 Flink JobManager(也就是 App Master)和 TaskManager

4.4.2. 修改環境變數

export HADOOP_CONF_DIR= /home/hadoop/hadoop/etc/hadoop

4.4.3. 部署啟動 

yarn-session.sh -d -s 2 -tm 800 -n 2

上面的命令的意思是,同時向Yarn申請3container,其中 2 Container 啟動 TaskManager-n 2),每個 TaskManager 擁有兩個 Task Slot-s 2),並且向每個 TaskManager Container 申請 800M 的記憶體,以及一個ApplicationMasterJob Manager)。

 

Flink部署到Yarn Cluster後,會顯示Job Manager的連接細節信息。

Flink on Yarn會覆蓋下麵幾個參數,如果不希望改變配置文件中的參數,可以動態的通過-D選項指定,如 -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

jobmanager.rpc.address:因為JobManager會經常分配到不同的機器上

taskmanager.tmp.dirs:使用Yarn提供的tmp目錄

parallelism.default:如果有指定slot個數的情況下

yarn-session.sh會掛起進程,所以可以通過在終端使用CTRL+C或輸入stop停止yarn-session

如果不希望Flink Yarn client長期運行,Flink提供了一種detached YARN session,啟動時候加上參數-d—detached

 

在上面的命令成功後,我們就可以在 Yarn Application 頁面看到 Flink 的紀錄。如下圖。

如果在虛擬機中測試,可能會遇到錯誤。這裡需要註意記憶體的大小,Flink Yarn 會申請多個 Container,但是 Yarn 的配置可能限制了 Container 所能申請的記憶體大小,甚至 Yarn 本身所管理的記憶體就很小。這樣很可能無法正常啟動 TaskManager,尤其當指定多個 TaskManager 的時候。因此,在啟動 Flink 之後,需要去 Flink 的頁面中檢查下 Flink 的狀態。這裡可以從 RM 的頁面中,直接跳轉(點擊 Tracking UI)。這時候 Flink 的頁面如圖

 

yarn-session.sh啟動命令參數如下:

Usage:  

   Required  

     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)  

   Optional  

     -D <arg>                        Dynamic properties  

     -d,--detached                   Start detached  

     -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]  

     -nm,--name                      Set a custom name for the application on YARN  

     -q,--query                      Display available YARN resources (memory, cores)  

     -qu,--queue <arg>               Specify YARN queue.  

     -s,--slots <arg>                Number of slots per TaskManager  

     -st,--streaming                 Start Flink in streaming mode  

     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]  

4.4.4. 提交任務

之後,我們可以通過這種方式提交我們的任務

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

以上命令在參數前加上y首碼,-yn表示TaskManager個數。

在這個模式下,同樣可以使用-m yarn-cluster提交一個"運行後即焚"detached yarn-yd)作業到yarn cluster

 

4.4.5. 停止yarn cluster

yarn application -kill application_1507603745315_0001

5. 技術的使用

5.1. Flink開發標準流程

  1. 獲取execution environment,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  1. 載入/創建初始化數據

DataStream<String> text = env.readTextFile("file:///path/to/file");

  1. 指定 transformations 作用在數據上

val mapped = input.map { x => x.toInt }

  1. 存儲結果集

writeAsText(String path)

print()

  1. 觸發程式執行

local模式下執行程式

execute()

將程式達成jar運行線上上

./bin/flink run \

-m master1:8082 \

./examples/batch/WordCount.jar \

--input hdfs://master1:9000/words.txt \

--output hdfs://master1:9000/clinkout \

 

5.2. Wordcount

5.2.1. Scala代碼

object SocketWindowWordCount { 
    def main(args: Array[String]) : Unit = {
        // the port to connect to
        val port: Int = try {
         ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
                return
            }
        }
        // get the execution environment
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        // get input data by connecting to the socket
        val text = env.socketTextStream("localhost", port, '\n')
        // parse the data, group it, window it, and aggregate the counts
        val windowCounts = text
            .flatMap { w => w.split("\\s") }
            .map { w => WordWithCount(w, 1) }
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .sum("count")
        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1)
        env.execute("Socket Window WordCount")
    }
    // Data type for words with count
    case class WordWithCount(word: String, count: Long)
}

5.2.2. Java代碼

public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {
        // the port to connect to
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }
        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");
        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                @Override
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                @Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });
        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount");
    }
    // Data type for words with count
    public static class WordWithCount {
        public String word;
        public long count;
        public WordWithCount() {}
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
} 

5.2.3. 運行

啟動nc發送消息

$ nc -l 9000

啟動flink程式

$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

 

 

5.2.4. 測試

l 輸入

$ nc -l 9000

lorem ipsum
ipsum ipsum ipsum
bye

l 輸出

$ tail -f log/flink-*-jobmanager-*.out

lorem : 1
bye : 1
ipsum : 4

5.3. 使用IDEA開發離線程式

Datasetflink的常用程式,數據集通過source進行初始化,例如讀取文件或者序列化集合,然後通過transformationfilteringmappingjoininggrouping)將數據集轉成,然後通過sink進行存儲,既可以寫入hdfs這種分散式文件系統,也可以列印控制台,flink可以有很多種運行方式,如localflink集群、yarn

5.3.1. Pom

n Java

<properties>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.10.2</scala.version>
        <scala.compat.version>2.10</scala.compat.version>
        <hadoop.version>2.6.2</hadoop.version>
        <flink.version>1.3.2</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.22</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-make:transitive</arg>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.apache.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
              

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Redis是一個開源的使用ANSI C語言編寫、支持網路、可基於記憶體亦可持久化的日誌型、Key-Value資料庫,並提供多種語言的API。(百度百科 0.0) 下載:http://www.redis.cn/ 1、將下載的redis-4.0.9.tar.gz放置/usr/local目錄下,並解壓: t ...
  • 博主在工作中,常常需要使用sql語句來進行查詢,總結髮現,靈活使用這幾個要點,就可以應付大部分簡單情況。 一.連接:根據兩個或多個表中的列之間的關係,從這些表中查詢數據。 JOIN或INNER JOIN: 返回左表和右表中相互匹配的行 LEFT JOIN: 返回左表和右表中相互匹配的行,及左表中不相 ...
  • 在MySQL中,my.cnf是參數文件(Option Files),類似於ORACLE資料庫中的spfile、pfile參數文件,照理說,參數文件my.cnf中的都是系統參數(這種稱呼比較符合思維習慣),但是官方又稱呼其為系統變數(system variables),那麼到底這個叫系統參數或系統變數... ...
  • 1. 日期轉化為字元串 (以2016年10月20日為例) select to_char(sysdate,'yyyy-mm-dd hh24:mi:ss') strDateTime from dual; --獲取年-月-日 時:分:秒 --顯示結果為:2016-10-20 12:35:21 select ...
  • 本文由 網易雲 發佈。 作者:郭憶 本篇文章僅限內部分享,如需轉載,請聯繫網易獲取授權。 故障恢復 MySQL基於Check point的機制,周期性的建立redo log與數據頁的一致點。一旦資料庫重啟,從記錄的Check point開始,根據redo log,對相應的數據頁進行更新,對於已經提交 ...
  • 通過數據交換平臺上傳較大的文件時,經常會出現導入失敗情況,換種方式通過新數據開發平臺(stark)也可以輕鬆實現外部數據與hive的數據關聯。 --第一步、導入csv文件到hive --stark數據開發平臺——>資源管理——>搜索欄右邊+號——>上傳資源(資源類型:選擇普通文件) --第二步、建表 ...
  • 本文由 網易雲 發佈。 作者:範欣欣 本篇文章僅限內部分享,如需轉載,請聯繫網易獲取授權。 眾所周知,HBase預設適用於寫多讀少的應用,正是依賴於它相當出色的寫入性能:一個100台RS的集群可以輕鬆地支撐每天10T 的寫入量。當然,為了支持更高吞吐量的寫入,HBase還在不斷地進行優化和修正,這篇 ...
  • 本文由 網易雲 發佈。 作者:唐雕龍 本篇文章僅限內部分享,如需轉載,請聯繫網易獲取授權。 面向新手的hadoop+hive學習環境搭建,加對我走過的坑總結,避免大家踩坑。 對於hive相關docker,並沒有官方的docker,第三方維護經過測試各種不靠譜,所以才想到自己搭建一套,然後後期在整理成 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...