Flink+Kafka整合的實例

来源:https://www.cnblogs.com/ALittleMoreLove/archive/2018/08/15/9481545.html
-Advertisement-
Play Games

Flink+Kafka整合實例 1.使用工具Intellig IDEA新建一個maven項目,為項目命名為kafka01。 2.我的pom.xml文件配置如下。 3.在項目的目錄/src/main/java在創建兩個Java類,分別命名為KafkaDemo和CustomWatermarkEmitte ...


Flink+Kafka整合實例

1.使用工具Intellig IDEA新建一個maven項目,為項目命名為kafka01。

2.我的pom.xml文件配置如下。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hrb.lhr</groupId>
    <artifactId>kafka01</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.1.4</flink.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- explicitly add a standard loggin framework, as Flink does not (in the future) have
           a hard dependency on one specific framework by default -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

</project>

3.在項目的目錄/src/main/java在創建兩個Java類,分別命名為KafkaDemo和CustomWatermarkEmitter,代碼如下所示。

import java.util.Properties;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;


public class KafkaDeme {

        public static void main(String[] args) throws Exception {

                // set up the streaming execution environment
                final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                //預設情況下,檢查點被禁用。要啟用檢查點,請在StreamExecutionEnvironment上調用enableCheckpointing(n)方法,
                // 其中n是以毫秒為單位的檢查點間隔。每隔5000 ms進行啟動一個檢查點,則下一個檢查點將在上一個檢查點完成後5秒鐘內啟動
                env.enableCheckpointing(5000);
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "10.192.12.106:9092");//kafka的節點的IP或者hostName,多個使用逗號分隔
                properties.setProperty("zookeeper.connect", "10.192.12.106:2181");//zookeeper的節點的IP或者hostName,多個使用逗號進行分隔
                properties.setProperty("group.id", "test-consumer-group");//flink consumer flink的消費者的group.id
                FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<String>("test0", new SimpleStringSchema(),
                        properties);//test0是kafka中開啟的topic
                myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
                DataStream<String> keyedStream = env.addSource(myConsumer);//將kafka生產者發來的數據進行處理,本例子我進任何處理
                keyedStream.print();//直接將從生產者接收到的數據在控制臺上進行列印
                // execute program
                env.execute("Flink Streaming Java API Skeleton");

        }
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks<String> {

    private static final long serialVersionUID = 1L;

    public long extractTimestamp(String arg0, long arg1) {
        if (null != arg0 && arg0.contains(",")) {
            String parts[] = arg0.split(",");
            return Long.parseLong(parts[0]);
        }
        return 0;
    }

    public Watermark checkAndGetNextWatermark(String arg0, long arg1) {
        if (null != arg0 && arg0.contains(",")) {
            String parts[] = arg0.split(",");
            return new Watermark(Long.parseLong(parts[0]));
        }
        return null;
    }
}

4.開啟一臺配置好zookeeper和kafka的Ubuntu虛擬機,輸入以下命令分別開啟zookeeper、kafka、topic、producer。(zookeeper和kafka的配置可參考https://www.cnblogs.com/ALittleMoreLove/p/9396745.html)

bin/zkServer.sh start
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper 10.192.12.106:2181 --replication-factor 1 --partitions 1 --topic test0
bin/kafka-console-producer.sh --broker-list 10.192.12.106:9092 --topic test0

5.檢測Flink程式是否可以接收到來自Kafka生產者發來的數據,運行Java類KafkaDemo,在開啟kafka生產者的終端下隨便輸入一段話,在IDEA控制台可以收到該信息,如下為kafka生產者終端和控制台。

OK,成功的接收到了來自Kafka生產者的消息^.^。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在MySQL當中,有可能遇到表名大小寫敏感的問題。其實這個跟平臺(操作系統)有關,也跟系統變數lower_case_table_names有關係。下麵總結一下,有興趣可以查看官方文檔“Identifier Case Sensitivity” In MySQL, databases correspon... ...
  • mysql的主從配置沿用上一篇博客的配置:https://www.cnblogs.com/MasterSword/p/9434169.html mycat下載地址:http://www.mycat.io/ 試驗版本:Mycat-server-1.6-release <!-- mycat的預設用戶配置 ...
  • 通過GO 命令 來達到語句迴圈效果 也叫批迴圈 也可用來新增 語句 ...
  • 幾天前收到某個業務項目,MySQL資料庫邏輯備份mysqldump備份失敗的郵件,本是在休假,但本著工作認真負責,7*24小時不間斷運維的高尚職業情操,開始了DBA的排錯之路(一開始資料庫的備份都是成功的,巧的是我休假就出問題,懷疑是數據量又有增長) 首先我們瞭解下mysqldump備份,數據流向的 ...
  • 1. 使用Symbolic Links分佈I/O mysql的資料庫名和表名是與文件系統的目錄名和文件名對應的,預設情況下,創建的資料庫和表都存放在參數datadir定義的目錄下。如果不使用RAID或邏輯捲,所有的表都放在一個磁碟設置上,無法發揮多磁碟並行讀寫的優勢。這種情況,我們可以利用操作系統的 ...
  • 摘要: 下文通過舉例的方式講述sqlserver中位運算的相關知識,如下所示: 實驗環境:sqlserver 2008 R2 在sqlserver的許可權設置,我們通常使用1、2、4、8、16、32、64、128等數值分別表示相關信息的某一狀態供業務狀態使用,通過欄位值之間的組合形成一個狀態值存儲到數 ...
  • /*轉自:https://www.cnblogs.com/shockerli/p/1000-plus-line-mysql-notes.html*/ /* 啟動MySQL */ net start mysql /* 連接與斷開伺服器 */ mysql -h 地址 -P 埠 -u 用戶名 -p 密碼... ...
  • 本節介紹PL SQL的基本內容 本節所舉示例數據來源oracle用戶scott下的emp表和dept表,數據如下: 一、plsql簡介: 1、概念:procedural language,過程化sql語言,是面向過程的語言,在普通sql的基礎上增加了編程語言的特點。PL/SQL的基本單元是塊。 2、 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...