Strimzi Kafka Bridge(橋接)實戰之二:生產和發送消息

来源:https://www.cnblogs.com/bolingcavalry/archive/2023/09/28/17724700.html
-Advertisement-
Play Games

歡迎訪問我的GitHub 這裡分類和彙總了欣宸的全部原創(含配套源碼):https://github.com/zq2599/blog_demos 本篇概覽 本文是《Strimzi Kafka Bridge(橋接)實戰之》系列的第二篇,咱們直奔bridge的重點:常用介面,用實際操作體驗如何用brid ...


歡迎訪問我的GitHub

這裡分類和彙總了欣宸的全部原創(含配套源碼):https://github.com/zq2599/blog_demos

本篇概覽

  • 本文是《Strimzi Kafka Bridge(橋接)實戰之》系列的第二篇,咱們直奔bridge的重點:常用介面,用實際操作體驗如何用bridge完成常用的消息收發業務

  • 官方的openapi介面文檔地址 : https://strimzi.io/docs/bridge/in-development/#_openapi

  • 整篇文章由以下內容構成:

  1. 準備工作:創建topic
  2. 生產消息
  3. 消費消息,strimzi bridge消費消息的邏輯略有些特殊,就是要提前創建strimzi bridge consumer,再通過consumer來調用拉取消息的介面
  • 完成本篇實戰後,相信您已經可以數量的通過http來使用kafka的服務了

準備工作:創建topic

  • 遺憾的是,bridge未提供創建topic的API,所以咱們還是用命令來創建吧
  • ssh登錄kubernetes的宿主機
  • 執行創建名為bridge-quickstart-topic的topic,共四個分區
kubectl -n aabbcc \
run kafka-producer \
-ti \
--image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
--rm=true \
--restart=Never \
-- bin/kafka-topics.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--create \
--topic bridge-quickstart-topic \
--partitions 4 \
--replication-factor 1
  • 檢查topic創建是否成功
kubectl -n aabbcc \
run kafka-producer \
-ti \
--image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
--rm=true \
--restart=Never \
-- bin/kafka-topics.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--describe \
--topic bridge-quickstart-topic
  • 如下圖,可見topic的創建符合預期
    在這裡插入圖片描述
  • 接下來的操作都是向bridge發送http請求完成的,我這邊宿主機的IP地址是192.168.0.1,bridge的NodePort埠號31331

查看指定topic的詳情

  • 如下請求,可以取得topicbridge-quickstart-topic的詳情
curl -X GET \
  http://192.168.0.1:31331/topics/bridge-quickstart-topic
  • 收到響應如下,是這個topic的詳細信息
{
	"name": "bridge-quickstart-topic",
	"configs": {
		"compression.type": "producer",
		"leader.replication.throttled.replicas": "",
		"message.downconversion.enable": "true",
		"min.insync.replicas": "1",
		"segment.jitter.ms": "0",
		"cleanup.policy": "delete",
		"flush.ms": "9223372036854775807",
		"follower.replication.throttled.replicas": "",
		"segment.bytes": "1073741824",
		"retention.ms": "604800000",
		"flush.messages": "9223372036854775807",
		"message.format.version": "3.0-IV1",
		"max.compaction.lag.ms": "9223372036854775807",
		"file.delete.delay.ms": "60000",
		"max.message.bytes": "1048588",
		"min.compaction.lag.ms": "0",
		"message.timestamp.type": "CreateTime",
		"preallocate": "false",
		"min.cleanable.dirty.ratio": "0.5",
		"index.interval.bytes": "4096",
		"unclean.leader.election.enable": "false",
		"retention.bytes": "-1",
		"delete.retention.ms": "86400000",
		"segment.ms": "604800000",
		"message.timestamp.difference.max.ms": "9223372036854775807",
		"segment.index.bytes": "10485760"
	},
	"partitions": [
		{
			"partition": 0,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		},
		{
			"partition": 1,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		},
		{
			"partition": 2,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		},
		{
			"partition": 3,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		}
	]
}

批量生產消息(同步)

  • 試試bridge提供的批量生產消息的API,以下命令會生產了三條消息,第一條通過key的hash值確定分區,第二條用partition參數明確指定了分區是2,第三條的分區是按照輪詢策略更新的
curl -X POST \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001"
        },
        {
            "value": "sales-lead-0002",
            "partition": 2
        },
        {
            "value": "sales-lead-0003"
        }
    ]
}'
  • bridge響應如下,會返回每一條消息的partition和offset,這就是同步消息的特點,等到meta信息更新完畢後才會返回
{
	"offsets": [{
		"partition": 0,
		"offset": 0
	}, {
		"partition": 2,
		"offset": 0
	}, {
		"partition": 3,
		"offset": 0
	}]
}

批量生產消息(非同步)

  • 有的場景下,例如追求高QPS並且對返回的meta信息不關註,可以考慮非同步的方式發送消息,也就是說bridge收到響應後立即返回200,這種非同步模式和前面的同步模式只有一個參數的差別:在請求url中增加async=true即可
curl -X POST \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic?async=true \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001"
        },
        {
            "value": "sales-lead-0002",
            "partition": 2
        },
        {
            "value": "sales-lead-0003"
        }
    ]
}'
  • 沒有響應body,請您自行請求感受一下,響應明顯比同步模式快

查看partition

  • 查看tipic的parition情況
curl -X GET \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions
  • 響應
[{
	"partition": 0,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}, {
	"partition": 1,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}, {
	"partition": 2,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}, {
	"partition": 3,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}]
  • 查看指定partition
curl -X GET \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions/0
  • 響應
{
	"partition": 0,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}
  • 查看指定partition的offset情況
curl -X GET \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions/0/offsets
  • 響應
{
	"beginning_offset": 0,
	"end_offset": 5
}

創建bridge consumer

  • 通過bridge消費消息,有個特別且重要的前提:創建bridge consumer,只有先創建了bridge consumer,才能順利從kafka的broker取到消息
  • 以下命令創建了一個bridge consumer,各參數的含義稍後會說明
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "name": "bridge-quickstart-consumer",
    "auto.offset.reset": "earliest",
    "format": "json",
    "enable.auto.commit": false,
    "fetch.min.bytes": 16,
    "consumer.request.timeout.ms": 300000
  }'
  • 上述請求的參數解釋:
  1. 對應kafka的group為bridge-quickstart-consumer-group
  2. 此bridge consumer的name等於bridge-quickstart-consumer
  3. 參數enable.auto.commit表示是否自動提交offset,這裡設置成false,表示無需自動提交,後面的操作中會調用API請求來更新offset
  4. 參數fetch.min.bytes要特別註意,其值等於16,表示唯有消息內容攢夠了16位元組,拉取消息的請求才能獲取到消息,如果消息內容長度不到16位元組,收到的響應body就是空
  5. 參數consumer.request.timeout.ms也要註意,這裡我設置了300秒,如果超過300秒沒有去拉取消息,這個消費者就會被kafka移除(被移除後如果再去拉取消息,kafka會報錯:Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the grou)
  • 收到響應如下,instance_id表示這個bridge consumer的身份id,base_uri則是訂閱消息時必須使用的請求地址
{
	"instance_id": "bridge-quickstart-consumer",
	"base_uri": "http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer"
}

如何刪除bridge consumer

  • 以下命令可以刪除consumer,重點是將身份id放入path中
curl -X DELETE http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer

訂閱指定topic的消息

  • 創建bridge consumer成功後,接下來就能以這個consumer的身份去訂閱kafka消息了
  • 執行以下命令可以訂閱topic為bridge-quickstart-topic的kafka消息,註意請求地址就是前面創建bridge consumer時返回的base_uri欄位
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "topics": [
        "bridge-quickstart-topic"
    ]
}'
  • 從上述請求body可以看出,此請求可以一次訂閱多個topic,而且還可以使用topic_pattern(正則表達式)的形式來一次訂閱多個topic
  • 訂閱完成後,接下來就能主動拉取消息了

拉取消息

  • 在拉取消息之前,請確保已經提前生產了消息
  • 執行以下命令拉取一條消息
curl -X GET http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
  -H 'accept: application/vnd.kafka.json.v2+json'
  • 然而,當您執行了上述命令後,會發現返回body為空,別擔心,這是正常的現象,按照官方的說法,拉取到的第一條消息就是空的,這是因為拉取操作出觸發了rebalancing邏輯(rebalancing是kafka的概覽,是處理多個partition消費的操作),再次執行上述命令去拉取消息,這下正常了,body如下
[
	{
		"topic": "bridge-quickstart-topic",
		"key": "my-key",
		"value": "sales-lead-0001",
		"partition": 0,
		"offset": 0
	}, {
		"topic": "bridge-quickstart-topic",
		"key": "my-key",
		"value": "sales-lead-0001",
		"partition": 0,
		"offset": 1
	}
]

提交offset

  • 前面在創建bridge consumer的時候,參數enable.auto.commit的值等於fasle,表示由調用方主動提交offset到kafka,因此在拉取到消息之後,需要手動更新kafka consumer的offset
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
  • 該請求無返回body,只要返回碼是204就表示成功

設定offset

  • 試想這樣的場景:共生產了100條消息,消費者也已經將這100條全部消費完畢,現在由於某種原因,需要從91條開始,重新消費91-100這10條消息(例如需要重新計算),此時可以主動設定offset
  • 先執行以下命令,生產一條消息
curl -X POST \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "value": "sales-lead-a002-01234567890123456789",
            "partition": 2
        }
    ]
}'
  • 如下圖紅色箭頭,可見當前partition已經生產了75條消息了
    在這裡插入圖片描述
  • 咱們先拉取消息,將消息都消費掉
    在這裡插入圖片描述
  • 由於沒有新生產消息,此時再拉去應該拉取不到了
  • 現在執行以下請求,就可以將offset設置到74
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "offsets": [
        {
            "topic": "bridge-quickstart-topic",
            "partition": 2,
            "offset": 74
        }
    ]
}'
  • 再次拉取消息,發現74和之後的所有消息都可以拉去到了(註意,包含了74)
    在這裡插入圖片描述
  • 至此,咱們對生產和發送消息的常用介面都已經操作了一遍,對於常規的業務場景已經夠用,接下來的文章,咱們以此為基礎,玩出更多花樣來

歡迎關註博客園:程式員欣宸

學習路上,你不孤單,欣宸原創一路相伴...


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 文件目錄結構 tree . ├── bin ├── include │ └── calc.h ├── lib │ ├── add.c │ ├── div.c │ ├── mul.c │ └── sub.c └── src └── main.c 4 directories, 6 files 靜態庫生成 ...
  • 第一題 下列程式輸出啥? public class StringDemo{ private static final String MESSAGE="taobao"; public static void main(String [] args) { String a ="tao"+"bao"; S ...
  • 基本介紹 MyBatis-Plus (opens new window)(簡稱 MP)是一個 MyBatis (opens new window)的增強工具,在 MyBatis 的基礎上只做增強不做改變,為簡化開發、提高效率而生。 MyBatis-Plus特性 無侵入:只做增強不做改變,引入它不會對 ...
  • 在Java中,Serializable是一個標記介面(marker interface),用於指示一個類的對象可以被序列化。序列化是將對象轉換為位元組流的過程,可以將對象保存到文件、在網路上傳輸或在記憶體中傳遞。 當一個類實現了Serializable介面時,它表示該類的對象可以被序列化和反序列化。 序 ...
  • 在Java 21中,引入了虛擬線程(Virtual Threads)來簡化和增強併發性,這使得在Java中編程併發程式更容易、更高效。 虛擬線程,也稱為“用戶模式線程(user-mode threads)”或“纖程(fibers)”。該功能旨在簡化併發編程並提供更好的可擴展性。虛擬線程是輕量級的,這 ...
  • 折線圖是一種用於可視化數據變化趨勢的圖表,它可以用於表示任何數值隨著時間或類別的變化。 折線圖由折線段和折線交點組成,折線段表示數值隨時間或類別的變化趨勢,折線交點表示數據的轉折點。 折線圖的方向表示數據的變化方向,即正變化還是負變化,折線的斜率表示數據的變化程度。 1. 主要元素 折線圖主要由以下 ...
  • 1、概述 GEBCO(General Bathymetric Chart of the Oceans)全球 DEM數據集(Geo-Engineering Digital Savage)是基於“全球地球系統計劃”(Global Earth System Project)的最新數據集。 GEBCO 數據 ...
  • Question Description 使用JAVA語言的若依框架的時候,發現只要使用了startPage()函數, 並不需要前端傳遞分頁的數據,也不需要註解,就能完成分頁功能。預判他應該是使用類似攔截器的機制,但還是感覺很神奇,感覺知道個大概不過癮,還是要更細緻的瞭解才能滿足,就想研究一下並記錄 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...