kafka restful api功能介紹與使用

来源:https://www.cnblogs.com/xnchll/archive/2018/09/10/9618432.html
-Advertisement-
Play Games

前述 採用confluent kafka-rest proxy實現kafka restful service時候(具體參考上一篇筆記),通過http協議數據傳輸,需要註意的是採用了base64編碼(或者稱之為加密),如果消息再post之前不採用base64處理將會出現:服務端消息亂碼、程式報錯等,因 ...


  • 前述

採用confluent kafka-rest proxy實現kafka restful service時候(具體參考上一篇筆記),通過http協議數據傳輸,需要註意的是採用了base64編碼(或者稱之為加密),如果消息再post之前不採用base64處理將會出現:服務端消息亂碼、程式報錯等,因此正常的處理流程是:
1.先對待post的消息做UTF-8統一處理
2.採用base64編碼包處理消息

s='Kafka,hi'
ad="hi,kafka,i'm xnchall"
aa=ad.encode()#UTF-8統一處理
print(aa)
b64=base64.b64encode(ad.encode())#base64編碼包統一處理
  • 利用kafka-rest生產消息
POST /topics/(string:topic_name)

data={"records":[
{
"key":"a2V5",
"value":"Y29uZmx1ZW50"
},
{
"value":"a2Fma2E=",
"partition":1
},
{
"value":"bG9ncw=="
}
]}
data1={"records":[{"value":"5bCK5pWs55qE5a6i5oi35oKo5aW977yMaGkga2Fma2EsIGknbSB4bmNoYWxs"}]}
header={"Content-Type":"application/vnd.kafka.v1+json"}
r=requests.post(url=url,json=data,headers=header)
r=requests.post(url=url,json=data1,headers=header)
View Code
  • 向指定分區生產消息:Produce messages to one partition of the topic
POST /topics/(string:topic_name)/partitions/(int:partition_id)
ad="hi kafka,i'm xnchall"
url11="http://192.168.160.101:8082/topics/test_kfk_lk/partitions/1"

data2={"records":[{"value":(base64.b64encode(ad.encode())).decode()}]}
print(data2)
r2=requests.post(url=url11,json=data2,headers=header)
print(r2)
print(r2.content)
View Code
  • 創建或者註冊消費實例:Create a new consumer instance in the consumer group
POST /consumers/(string:group_name)

url3="http://192.168.160.101:8082/consumers/my_group"
data3={
"id":"my_consumer1",
"format":"binary",
"auto.offset.reset":"smallest",
"auto.commit.enable":"false"
}

r3=requests.post(url=url3,json=data3,headers=header)
View Code
  • 提交偏移  Commit offsets for the consumer
POST /consumers/(string:group_name)/instances/(string:instance)/offsets

url4="http://192.168.160.101:8082/consumers/my_group/instances/my_consumer1/offsets"
r4=requests.post(url=url4,headers=header)
View Code
  • 消費消息
GET /consumers/(string:group_name)/instances/(string:instance)/topics/(string:topic_name)

url_get2="http://192.168.160.101:8082/consumers/my_group/instances/my_consumer1/topics/test_kfk_lk"
rr2=requests.get(url=url_get2,headers=header)#,params={"timeout":3000000}
print(rr2)
print(rr2.content)
print(rr2.text)
View Code
  • 刪除消費者實例 Destroy the consumer instance
DELETE /consumers/(string:group_name)/instances/(string:instance)
#url_del="http://192.168.160.101:8082/consumers/test_kfk_lk/instances/my_consumer"
#d1=requests.delete(url_del)#刪除消費者實例
#print(d1)
View Code
  • 獲取指定分區、偏移消息: Consume messages from one partition of the topic.(api V2)
GET /topics/(string:topic_name)/partitions/(int:partition_id)/messages?offset=(int)[&count=(int)]

Fetch Response v1 only contains message format v0.
Fetch Response v2 might either contain message format v0 or message format v1.
Possible Error Codes
* OFFSET_OUT_OF_RANGE (1)
* UNKNOWN_TOPIC_OR_PARTITION (3)
* NOT_LEADER_FOR_PARTITION (6)
* REPLICA_NOT_AVAILABLE (9)
* UNKNOWN (-1)
url_p="http://192.168.160.101:8082/topics/test_kfk/partitions/0/messages"
rst=requests.get(url_p,headers=header,params={"offset":3,"count":2})#,"count":2})
print(rst)
print(len(rst.json()))
if(rst.status_code!=500):
For itr in rst.json():
    print(base64.b64decode(itr['value']).decode())
print(rst.url)#http://192.168.160.101:8082/topics/test_kfk/partitions/0/messages?offset=3&count=2
View Code
  • 獲取當前訂閱的topic列表.(api V2)
POST /consumers/(string:group_name)/instances/(string:instance)/subscription
  • 獲取手工指定的消費者的分區(api V2)
GET /consumers/(string:group_name)/instances/(string:instance)/assignments
GET /consumers/testgroup/instances/my_consumer/assignments HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json
{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }
]
}
  • 覆蓋消費者即將消費的消息的偏移量(api V2)
POST /consumers/(string:group_name)/instances/(string:instance)/positions
POST /consumers/testgroup/instances/my_consumer/positions HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json
{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 20
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": 30
    }
  ]
}
  • 獲取給定topic的分區的最後偏移
POST /consumers/(string:group_name)/instances/(string:instance)/positions/end
POST /consumers/testgroup/instances/my_consumer/positions/end HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json
{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }
]
}
  • 使用分配和訂閱api消費topic或者分區數據
GET /consumers/(string:group_name)/instances/(string:instance)/records
GET /consumers/testgroup/instances/my_consumer/records?timeout=3000&max_bytes=300000 HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.binary.v2+json
Example binary response:
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.binary.v2+json
[
  {
    "topic": "test",
    "key": "a2V5",
    "value": "Y29uZmx1ZW50",
    "partition": 1,
    "offset": 100,
  },
  {
    "topic": "test",
    "key": "a2V5",
    "value": "a2Fma2E=",
    "partition": 2,
    "offset": 101,
  }
]

  

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • jQuery 一.jQuery介紹 1.jQuery是一個輕量級.相容多瀏覽器的js庫. 2.jQuery使用戶能夠更方便地處理HTML Document,Events,實現動畫效果,方便的進行Ajax交互,能夠極大的簡化js編程,它的宗旨就是"Write less,do more." 二.為什麼要 ...
  • 起名方式與CSS 一.起名方式(起名方式也叫選擇器) 起名的目的是為了給標簽添加屬性 常見的3種選擇器有 標簽選擇器 id選擇器(使用的時候加#) class選擇器(使用的時候加.) 樣式的要求是由選擇器的權重來決定的 標簽的權重為1 class的權重是10 id的權重是100 權重是可以疊加的。 ...
  • 分析1.必須在包的頂層目錄下2.二進位文件應該在bin目錄下3.javascipt在lib目錄下4.文檔在doc目錄下 package.json欄位分析 name:包的名稱,必須是唯一的,由小寫英文字母、數字和下劃線組成,不能包含空格 description:包的簡要說明 version:符合語義化 ...
  • ...
  • 效果:設置浮動後父元素高度塌陷 插入下麵兩段代碼:. 效果: ...
  • 不啰嗦,直接上代碼: 1、在瀏覽器輸入網址:http://api.asilu.com/weather/?callback=getname&city=深圳 你會看到如下結果:他返回的是json數據 整理之後是這樣的: 2、怎樣才能獲取上述json的數據呢?很簡單: 3、效果圖是這樣的: 4、完畢。 ...
  • 根據我的面試經歷,一般小公司的面試環節,比較關心框架的熟練程度,以及獨立開發組件的能力 但大廠通常有五輪以上的面試,而且對 js 基礎語法很是看重 於是我總結了一些關於 js 基礎的面試對話,有的當時沒答上來,就在總結的時候就加了點料 忽然覺得又該讀一遍犀牛書了... 一、面試對話 問:你知道 js ...
  • 系統介紹: 1.系統採用主流的 SSM 框架 jsp JSTL bootstrap html5 (PC瀏覽器使用) 2.springmvc +spring4.3.7+ mybaits3.3 SSM 普通java web(非maven, 附贈pom.xml文件) 資料庫:mysql 3.開發工具:my ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...