Kafka與.net core(三)kafka操作

来源:https://www.cnblogs.com/chenyishi/archive/2019/01/11/10250768.html
-Advertisement-
Play Games

1.Kafka相關知識 Broker:即Kafka的伺服器,用戶存儲消息,Kafa集群中的一臺或多台伺服器統稱為broker。 Message消息:是通信的基本單位,每個 producer 可以向一個 topic(主題)發佈一些消息。 Kafka中的Message是以topic為基本單位組織的,不同 ...


1.Kafka相關知識

  • Broker:即Kafka的伺服器,用戶存儲消息,Kafa集群中的一臺或多台伺服器統稱為broker。
  • Message消息:是通信的基本單位,每個 producer 可以向一個 topic(主題)發佈一些消息。
    • Kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨立的。每個topic又可以分成幾個不同的partition(每個topic有幾個partition是在創建topic時指定的),每個partition存儲一部分Message。
    • partition中的每條Message包含了以下三個屬性:Kafka基於文件存儲.通過分區,可以將日誌內容分散到多個server上,來避免文件尺寸達到單機磁碟的上限,每個partiton都會被當前server(kafka實例)保存可以將一個topic切分多任意多個partitions,來消息保存/消費的效率。
      • offset:消息唯一標識:對應類型:long
      • MessageSize 對應類型:int32
      • data 是message的具體內容。
    • 越多的partitions意味著可以容納更多的consumer,有效提升併發消費的能力。
  • Message:在Broker中通Log追加的方式進行持久化存儲。併進行分區(patitions)。
    • 一個Topic可以認為是一類消息,每個topic將被分成多partition(區),每個partition在存儲層面是append log文件。任何發佈到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),partition是以文件的形式存儲在文件系統中。
    • Logs文件根據broker中的配置要求,保留一定時間後刪除來釋放磁碟空間。

      

    • Topic物理上的分組,一個 topic可以分為多個 partition,每個 partition 是一個有序的隊列。partition中的每條消息都會被分配一個有序的 id(offset)。
    • 為實現稀疏存儲,我們通過給文件建索引,每隔一定位元組的數據建立一條索引

       

  • 為了減少磁碟寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO調用的次數。
  • Broker沒有副本機制,一旦broker宕機,該broker的消息將都不可用。Message消息是有多份的。
  • consumer:消息和數據消費者,訂閱topics並處理其發佈的消息的過程叫做consumers。
    • 在 kafka中,我們可以認為一個group是一個訂閱者,一個Topic中的每個partions,只會被一個訂閱者中的一個consumer消費,不過一個 consumer可以消費多個partitions中的消息(消費者數據小於Partions  的數量時)。註意:kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到消息。
    • 一個partition中的消息只會被group中的一個consumer消息。每個group中consumer消息消費互相獨立。
  • 無狀態導致消息的刪除成為難題(可能刪除的消息正在被訂閱),kafka採用基於時間的SLA(服務水平保證),消息保存一定時間(通常為7天)後會被刪除。
  • 消息訂閱者可以rewind back到任意位置重新進行消費,當訂閱者故障時,可以選擇最小的offset(id)進行重新讀取消費消息。

2.kafka操作

2.1.查看有哪些主題:

kafka-topics.sh --list --zookeeper 192.168.0.201:12181

2.2.查看topic的詳細信息

kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1

2.3.為topic增加副本

kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute

2.4.創建topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1

2.5為topic增加partition

bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1

2.6kafka生產者客戶端命令

kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1

2.7kafka消費者客戶端命令

kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1

2.8kafka服務啟動

kafka-server-start.sh -daemon ../config/server.properties

3..net core操作

producer端,引入Confluent.Kafka

Install-Package Confluent.Kafka -Version 1.0-beta2
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace KafkaTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Test().Wait();
        }
        static async Task Test()
        {
           var conf = new ProducerConfig { BootstrapServers = "39.**.**.**:9092" };

            Action<DeliveryReportResult<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}");

            using (var p = new Producer<Null, string>(conf))
            {
                for (int i = 0; i < 100000; ++i)
                {
                    p.BeginProduce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler);
                }

                // wait for up to 10 seconds for any inflight messages to be delivered.
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }
}

consumer端,引入Confluent.Kafka

Install-Package Confluent.Kafka -Version 1.0-beta2
using Confluent.Kafka;
using System;
using System.Linq;
using System.Text;

namespace KafkaClient
{
    class Program
    {
        static void Main(string[] args)
        {
            

            var conf = new ConsumerConfig
            {
                GroupId = "test-consumer-group4",
                BootstrapServers = "39.**.**.**:9092",
                // Note: The AutoOffsetReset property determines the start offset in the event
                // there are not yet any committed offsets for the consumer group for the
                // topic/partitions of interest. By default, offsets are committed
                // automatically, so in this example, consumption will only start from the
                // earliest message in the topic 'my-topic' the first time you run the program.
                AutoOffsetReset = AutoOffsetResetType.Earliest
            };

            using (var c = new Consumer<Ignore, string>(conf))
            {
                c.Subscribe("my-topic");

                bool consuming = true;
                // The client will automatically recover from non-fatal errors. You typically
                // don't need to take any action unless an error is marked as fatal.
                c.OnError += (_, e) => consuming = !e.IsFatal;

                while (consuming)
                {
                    try
                    {
                        var cr = c.Consume();
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }

                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 上一篇博客講了進程、線程、協程和GIL的基本概念,這篇我們來說說在以下三點: 1> python中使用threading庫來創建線程的兩種方式 2> 使用Event對消來判斷線程是否已啟動 3> 使用Semaphore和BoundedSemaphore兩個類分別來控制線程的併發數以及二者之間的區別。 ...
  • 一、什麼是函數 當我們在日常工作中編寫代碼時,有沒有發現這種情況,寫了一套代碼,卻發現裡面有很多段代碼出現了有規律的重覆,這樣就不符合一個合格程式員的標準了,一個合格的程式員編寫的代碼最重要的應該是簡潔,精煉。那麼,有什麼方法能減少代碼中出現的這樣有規律的重覆的情況嗎?當然有,那就是函數。例如我們平 ...
  • Django自帶後臺管理的配置 創建項目和應用 修改配置文件 資料庫配置 應用配置 時間和語言相關配置 當前應用的models下創建類 生成遷移文件 python manage.py makemigrations 執行遷移 pythin manage.py migrate 啟動項目 啟動項目 訪問 ...
  • 模塊:本質就是.py結尾的文件。從邏輯上組織python代碼。 包: 本質就是一個目錄,帶有__init__.py文件,從邏輯上組織模塊。 模塊的分類: 1.標準庫(內置的模塊) 2.開源庫(第三方庫) 3.自定義模塊 模塊導入方法: 1.import + 模塊名 2.from......impor ...
  • python之網路編程 本地的進程間通信(IPC)有很多種方式,但可以總結為下麵4類: 消息傳遞(管道、FIFO、消息隊列) 同步(互斥量、條件變數、讀寫鎖、文件和寫記錄鎖、信號量) 共用記憶體(匿名的和具名的) 遠程過程調用(Solaris門和Sun RPC) 但這些都不是本文的主題!我們要討論的是 ...
  • 前面介紹了類的基本定義,包括成員屬性、成員方法、構造方法幾個組成要素,可謂是具備了類的完整封裝形態。不過在進行下一階段的學習之前,有必要梳理一下前述的類定義代碼,看看是否存在哪些需要優化的地方。首先觀察以下的代碼片段,主要是重量屬性的定義及其設置方法: 註意到setWeight方法的輸入參數名叫in ...
  • 題意 "題目鏈接" Sol $n \leqslant 16$可以想到狀壓 我們可以預處理出任意兩行之間每列的最小值以及相鄰兩列的最小值 然後枚舉一個起點,$f[sta][i]$表示走過了$sta$這個集合內的元素,當前在$i$點的$k$的最大值 轉移的時候枚舉接下來走哪個位置即可 時間複雜度$n^3 ...
  • 1. 問題 假設我在Windows10的環境新建一個4.6的WPF項目,添加一個ComboBox,並用Blend在這個ComboBox上右鍵“編輯模板” “編輯副本”,Blend不僅幫我創建了模板,還會自動引用PresentationFramework.Aero2這個DLL,即使用Aero2這個主題 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...