雲伺服器(Linux)安裝部署Kafka

来源:https://www.cnblogs.com/ndchao/archive/2022/11/14/chaos1.html
-Advertisement-
Play Games

前提 Tomcat 10.1.x Tomcat線程池介紹 Tomcat線程池,源於JAVA JDK自帶線程池。由於JAVA JDK線程池策略,比較適合處理 CPU 密集型任務,但是對於 I/O 密集型任務,如資料庫查詢,rpc 請求調用等,不是很友好,所以Tomcat在其基礎上進行了擴展。 任務處理 ...


雲伺服器(Linux)安裝部署Kafka

前期準備

kafka的安裝需要依賴於jdk,需要在伺服器上提前安裝好該環境,這裡使用用jdk1.8。

下載安裝包

官網地址:

較新的版本已自帶Zookeeper,無需額外下載。這裡使用3.2.0做演示。

註意要下載Binary downloads標簽下的tgz包,Source download標簽下的包為源碼。無法直接運行,需要編譯。

上傳安裝包到雲伺服器

使用ssh連接工具將kafka_2.12-3.2.0.tgz這個包上傳到雲伺服器上的一個目錄。

打開命令行,進入到放有壓縮包的目錄,執行

tar -zxvf kafka_2.12-3.2.0.tgz

配置kafka

然後使用cd命令進入到/kafka_2.12-3.2.0/config/下,使用

vi server.properties

編輯配置文件。

刪除listeners和advertised前方的#號,改成如下配置:

listeners=PLAINTEXT://雲伺服器內網ip:9092(本地訪問用本地ip)
# 如果要提供外網訪問則必須配置此項
advertised.listeners=PLAINTEXT://雲伺服器公網ip:9092(若要遠程訪問需配置此項為雲伺服器的公網ip)
# zookeeper連接地址,集群配置格式為ip:port,ip:port,ip:port
zookeeper.connect=雲伺服器公網ip:2181

開放雲伺服器埠

在雲伺服器控制台內進入安全組頁面,添加兩條新的入站規則,tcp/9092和tcp/2181

開放linux防火牆埠

先查看使用的防火牆類型iptables/firewalld

iptables操作命令

1.打開/關閉/重啟防火牆

開啟防火牆(重啟後永久生效):chkconfig iptables on

關閉防火牆(重啟後永久生效):chkconfig iptables off

開啟防火牆(即時生效,重啟後失效):service iptables start

關閉防火牆(即時生效,重啟後失效):service iptables stop

重啟防火牆:service iptables restartd

2.查看打開的埠

/etc/init.d/iptables status
3.開啟埠

iptables -A INPUT -p tcp --dport 8080 -j ACCEPT
4.保存並重啟防火牆
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart

Centos7預設安裝了firewalld,如果沒有安裝的話,可以使用 yum install firewalld firewalld-config進行安裝。

操作指令如下:

1.啟動防火牆

systemctl start firewalld
2.禁用防火牆

systemctl stop firewalld
3.設置開機啟動

systemctl enable firewalld
4.停止並禁用開機啟動

sytemctl disable firewalld
5.重啟防火牆

firewall-cmd --reload

6.查看狀態

systemctl status firewalld或者 firewall-cmd --state
7.在指定區域打開埠(記得重啟防火牆)

firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)

打開tcp/9092和tcp/2181這兩個埠後,重啟防火牆,並查看開放的埠確實生效。

啟動kafka服務

cd命令進入kafka_2.12-3.2.0目錄下,執行

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動zookeeper,不加-daemon方便排除啟動錯誤,新建一個shell視窗,進入該目錄再執行

bin/kafka-server-start.sh config/server.properties

啟動kafka,若列印日誌未報錯,若未出現error日誌,說明啟動成功。

測試單機連通性

查詢kafka下所有的topic
bin/kafka-topics.sh --list --zookeeper ip:port
因為kafka使用zookeeper作為配置中心,一些topic信息需要查詢該kafka對應的zookeeper
創建topic
bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
開啟生產者
bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test
開啟消費者
bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test


Springboot連接kafak

在pom.xml文件中引入kafka依賴

		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.2.0</version>
        </dependency>

在application.yml配置文件中配置kafka

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 雲伺服器外網ip地址:9092
    producer: # 生產者
      retries: 3 # 設置大於0的值,則客戶端會將發送失敗的記錄重新發送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交
      # RECORD
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後提交
      # BATCH
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交
      # TIME
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交
      # COUNT
      # TIME | COUNT 有一個條件滿足時提交
      # COUNT_TIME
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後, 手動調用Acknowledgment.acknowledge()後提交
      # MANUAL
      # 手動調用Acknowledgment.acknowledge()後立即提交,一般使用這種
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

生產者

@RestController
public class KafkaController {
    private final static String TOPIC_NAME = "test-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(@RequestParam("msg") String msg) {
        kafkaTemplate.send(TOPIC_NAME, "key", msg);
        return String.format("消息 %s 發送成功!", msg);
    }
}

消費者

@Component
public class DemoConsumer {
    /**
     * @param record record
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     * @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     * @TopicPartition(topic = "topic2", partitions = "0",
     * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     * },concurrency = "6")
     * //concurrency就是同組下的消費者個數,就是併發消費數,必須小於等於分區總數
     */
    @KafkaListener(topics = "test-topic", groupId = "testGroup1")
    public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup1 message: " + value);
        System.out.println("testGroup1 record: " + record);
        //手動提交offset,一般是提交一個banch,冪等性防止重覆消息
        // === 每條消費完確認性能不好!
        ack.acknowledge();
    }

    //配置多個消費組
    @KafkaListener(topics = "test--topic", groupId = "testGroup2")
    public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup2 message: " + value);
        System.out.println("testGroup2 record: " + record);
        //手動提交offset
        ack.acknowledge();
    }
}

使用swagger測試發送消息

控制台列印消息


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • vue腳手架的使用 搭建vue腳手架環境 1.傻瓜式安裝node: 官網下載:https://nodejs.org/zh-cn/ 2.安裝cnpm: >: npm install -g cnpm --registry=https://registry.npm.taobao.org 3.安裝vue最新 ...
  • 在瀏覽器訪問網站,想在瀏覽器最新化的情況下,也能收到右下角的消息通知 這個時候就會用到H5 Notifications 具體效果可以參照演示頁面 演示頁面-唯一線上客服系統 實現代碼js function notify(title, options, callback) { // 先檢查瀏覽器是否支 ...
  • TypeScript 是對 JavaScript 的補充,將 JavaScript 由動態類型、弱類型語言轉為靜態類型、強類型的語言 簡介 TypeScript 由三個部分組成: 類型:為 JavaScript 代碼添加類型與類型檢查來確保健壯性,進入學習 語法:提前使用新語法或新特性來簡化代碼,進 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 vm.$forceUpdate (1)作用 迫使Vue.js實例重新渲染。註意它僅僅影響實例本身以及插入插槽內容的子組件,而不是所有子組件。 (2)實現 只需要執行watcher的update方法,就可以讓實例重新渲染。 Vue.js的每 ...
  • 該系列已更新文章: 分享一個實用的 vite + vue3 組件庫腳手架工具,提升開發效率 開箱即用 yyg-cli 腳手架:快速創建 vue3 組件庫和vue3 全家桶項目 Vue3 企業級優雅實戰 - 組件庫框架 - 1 搭建 pnpm monorepo Vue3 企業級優雅實戰 - 組件庫框架 ...
  • css裡面有個背景色漸變色的效果,我們能拿來做什麼呢 現在就演示下,我在開發此頁面時所實際實現的樣子 演示頁面-唯一線上客服系統 實現代碼很簡單,效果還是很不錯: background: linear-gradient(90deg, #EE884C 0%, #FFBA8E 100%); 首頁里也有個 ...
  • 摘要:Ajax是非同步JavaScript和XML可用於前後端交互。 本文分享自華為雲社區《Flask框架:運用Ajax輪詢動態繪圖》,作者:LyShark。 Ajax是非同步JavaScript和XML可用於前後端交互,在之前《Flask 框架:運用Ajax實現數據交互》簡單實現了前後端交互,本章將通 ...
  • 5. 文件服務開發 全套代碼及資料全部完整提供,點此處下載 5.1 環境搭建 5.1.1 資料庫環境搭建 第一步:創建pd_files資料庫 create database pd_files character set utf8mb4; 第二步:在pd_files資料庫中創建pd_attachmen ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...