Flink+Kafka整合實例 1.使用工具Intellig IDEA新建一個maven項目,為項目命名為kafka01。 2.我的pom.xml文件配置如下。 3.在項目的目錄/src/main/java在創建兩個Java類,分別命名為KafkaDemo和CustomWatermarkEmitte ...
Flink+Kafka整合實例
1.使用工具Intellig IDEA新建一個maven項目,為項目命名為kafka01。
2.我的pom.xml文件配置如下。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hrb.lhr</groupId> <artifactId>kafka01</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.1.4</flink.version> <slf4j.version>1.7.7</slf4j.version> <log4j.version>1.2.17</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- explicitly add a standard loggin framework, as Flink does not (in the future) have a hard dependency on one specific framework by default --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> </project>
3.在項目的目錄/src/main/java在創建兩個Java類,分別命名為KafkaDemo和CustomWatermarkEmitter,代碼如下所示。
import java.util.Properties; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; public class KafkaDeme { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //預設情況下,檢查點被禁用。要啟用檢查點,請在StreamExecutionEnvironment上調用enableCheckpointing(n)方法, // 其中n是以毫秒為單位的檢查點間隔。每隔5000 ms進行啟動一個檢查點,則下一個檢查點將在上一個檢查點完成後5秒鐘內啟動 env.enableCheckpointing(5000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.192.12.106:9092");//kafka的節點的IP或者hostName,多個使用逗號分隔 properties.setProperty("zookeeper.connect", "10.192.12.106:2181");//zookeeper的節點的IP或者hostName,多個使用逗號進行分隔 properties.setProperty("group.id", "test-consumer-group");//flink consumer flink的消費者的group.id FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<String>("test0", new SimpleStringSchema(), properties);//test0是kafka中開啟的topic myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream<String> keyedStream = env.addSource(myConsumer);//將kafka生產者發來的數據進行處理,本例子我進任何處理 keyedStream.print();//直接將從生產者接收到的數據在控制臺上進行列印 // execute program env.execute("Flink Streaming Java API Skeleton"); }
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks<String> { private static final long serialVersionUID = 1L; public long extractTimestamp(String arg0, long arg1) { if (null != arg0 && arg0.contains(",")) { String parts[] = arg0.split(","); return Long.parseLong(parts[0]); } return 0; } public Watermark checkAndGetNextWatermark(String arg0, long arg1) { if (null != arg0 && arg0.contains(",")) { String parts[] = arg0.split(","); return new Watermark(Long.parseLong(parts[0])); } return null; } }
4.開啟一臺配置好zookeeper和kafka的Ubuntu虛擬機,輸入以下命令分別開啟zookeeper、kafka、topic、producer。(zookeeper和kafka的配置可參考https://www.cnblogs.com/ALittleMoreLove/p/9396745.html)
bin/zkServer.sh start bin/kafka-server-start.sh config/server.properties bin/kafka-topics.sh --create --zookeeper 10.192.12.106:2181 --replication-factor 1 --partitions 1 --topic test0 bin/kafka-console-producer.sh --broker-list 10.192.12.106:9092 --topic test0
5.檢測Flink程式是否可以接收到來自Kafka生產者發來的數據,運行Java類KafkaDemo,在開啟kafka生產者的終端下隨便輸入一段話,在IDEA控制台可以收到該信息,如下為kafka生產者終端和控制台。
OK,成功的接收到了來自Kafka生產者的消息^.^。