Flink+Kafka整合的實例

来源:https://www.cnblogs.com/ALittleMoreLove/archive/2018/08/15/9481545.html
-Advertisement-
Play Games

Flink+Kafka整合實例 1.使用工具Intellig IDEA新建一個maven項目,為項目命名為kafka01。 2.我的pom.xml文件配置如下。 3.在項目的目錄/src/main/java在創建兩個Java類,分別命名為KafkaDemo和CustomWatermarkEmitte ...


Flink+Kafka整合實例

1.使用工具Intellig IDEA新建一個maven項目,為項目命名為kafka01。

2.我的pom.xml文件配置如下。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hrb.lhr</groupId>
    <artifactId>kafka01</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.1.4</flink.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- explicitly add a standard loggin framework, as Flink does not (in the future) have
           a hard dependency on one specific framework by default -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

</project>

3.在項目的目錄/src/main/java在創建兩個Java類,分別命名為KafkaDemo和CustomWatermarkEmitter,代碼如下所示。

import java.util.Properties;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;


public class KafkaDeme {

        public static void main(String[] args) throws Exception {

                // set up the streaming execution environment
                final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                //預設情況下,檢查點被禁用。要啟用檢查點,請在StreamExecutionEnvironment上調用enableCheckpointing(n)方法,
                // 其中n是以毫秒為單位的檢查點間隔。每隔5000 ms進行啟動一個檢查點,則下一個檢查點將在上一個檢查點完成後5秒鐘內啟動
                env.enableCheckpointing(5000);
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "10.192.12.106:9092");//kafka的節點的IP或者hostName,多個使用逗號分隔
                properties.setProperty("zookeeper.connect", "10.192.12.106:2181");//zookeeper的節點的IP或者hostName,多個使用逗號進行分隔
                properties.setProperty("group.id", "test-consumer-group");//flink consumer flink的消費者的group.id
                FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<String>("test0", new SimpleStringSchema(),
                        properties);//test0是kafka中開啟的topic
                myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
                DataStream<String> keyedStream = env.addSource(myConsumer);//將kafka生產者發來的數據進行處理,本例子我進任何處理
                keyedStream.print();//直接將從生產者接收到的數據在控制臺上進行列印
                // execute program
                env.execute("Flink Streaming Java API Skeleton");

        }
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks<String> {

    private static final long serialVersionUID = 1L;

    public long extractTimestamp(String arg0, long arg1) {
        if (null != arg0 && arg0.contains(",")) {
            String parts[] = arg0.split(",");
            return Long.parseLong(parts[0]);
        }
        return 0;
    }

    public Watermark checkAndGetNextWatermark(String arg0, long arg1) {
        if (null != arg0 && arg0.contains(",")) {
            String parts[] = arg0.split(",");
            return new Watermark(Long.parseLong(parts[0]));
        }
        return null;
    }
}

4.開啟一臺配置好zookeeper和kafka的Ubuntu虛擬機,輸入以下命令分別開啟zookeeper、kafka、topic、producer。(zookeeper和kafka的配置可參考https://www.cnblogs.com/ALittleMoreLove/p/9396745.html)

bin/zkServer.sh start
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper 10.192.12.106:2181 --replication-factor 1 --partitions 1 --topic test0
bin/kafka-console-producer.sh --broker-list 10.192.12.106:9092 --topic test0

5.檢測Flink程式是否可以接收到來自Kafka生產者發來的數據,運行Java類KafkaDemo,在開啟kafka生產者的終端下隨便輸入一段話,在IDEA控制台可以收到該信息,如下為kafka生產者終端和控制台。

OK,成功的接收到了來自Kafka生產者的消息^.^。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在MySQL當中,有可能遇到表名大小寫敏感的問題。其實這個跟平臺(操作系統)有關,也跟系統變數lower_case_table_names有關係。下麵總結一下,有興趣可以查看官方文檔“Identifier Case Sensitivity” In MySQL, databases correspon... ...
  • mysql的主從配置沿用上一篇博客的配置:https://www.cnblogs.com/MasterSword/p/9434169.html mycat下載地址:http://www.mycat.io/ 試驗版本:Mycat-server-1.6-release <!-- mycat的預設用戶配置 ...
  • 通過GO 命令 來達到語句迴圈效果 也叫批迴圈 也可用來新增 語句 ...
  • 幾天前收到某個業務項目,MySQL資料庫邏輯備份mysqldump備份失敗的郵件,本是在休假,但本著工作認真負責,7*24小時不間斷運維的高尚職業情操,開始了DBA的排錯之路(一開始資料庫的備份都是成功的,巧的是我休假就出問題,懷疑是數據量又有增長) 首先我們瞭解下mysqldump備份,數據流向的 ...
  • 1. 使用Symbolic Links分佈I/O mysql的資料庫名和表名是與文件系統的目錄名和文件名對應的,預設情況下,創建的資料庫和表都存放在參數datadir定義的目錄下。如果不使用RAID或邏輯捲,所有的表都放在一個磁碟設置上,無法發揮多磁碟並行讀寫的優勢。這種情況,我們可以利用操作系統的 ...
  • 摘要: 下文通過舉例的方式講述sqlserver中位運算的相關知識,如下所示: 實驗環境:sqlserver 2008 R2 在sqlserver的許可權設置,我們通常使用1、2、4、8、16、32、64、128等數值分別表示相關信息的某一狀態供業務狀態使用,通過欄位值之間的組合形成一個狀態值存儲到數 ...
  • /*轉自:https://www.cnblogs.com/shockerli/p/1000-plus-line-mysql-notes.html*/ /* 啟動MySQL */ net start mysql /* 連接與斷開伺服器 */ mysql -h 地址 -P 埠 -u 用戶名 -p 密碼... ...
  • 本節介紹PL SQL的基本內容 本節所舉示例數據來源oracle用戶scott下的emp表和dept表,數據如下: 一、plsql簡介: 1、概念:procedural language,過程化sql語言,是面向過程的語言,在普通sql的基礎上增加了編程語言的特點。PL/SQL的基本單元是塊。 2、 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...