Kafka學習(三)-------- Kafka核心之Consumer

来源:https://www.cnblogs.com/tree1123/archive/2019/07/25/11243668.html
-Advertisement-
Play Games

瞭解了什麼是kafka( https://www.cnblogs.com/tree1123/p/11226880.html)以後 學習Kafka核心之消費者,kafka的消費者經過幾次版本變化,特別容易混亂,所以一定要搞清楚是哪個版本再研究。 一、舊版本consumer 只有舊版本(0.9以前)才有 ...


瞭解了什麼是kafka( https://www.cnblogs.com/tree1123/p/11226880.html)以後

學習Kafka核心之消費者,kafka的消費者經過幾次版本變化,特別容易混亂,所以一定要搞清楚是哪個版本再研究。

一、舊版本consumer

只有舊版本(0.9以前)才有 high-level consumer 和 low-level consumer之分,很多的文章提到的就是這兩個:低階消費者和高階消費者,低階消費者更靈活但是需要自己維護很多東西,高階就死板一點但是不需要維護太多東西。

high-level consumer就是消費者組。

low-level consumer是單獨一個消費者,單個consumer沒有什麼消費者組的概念,與其他consumer相互之間不關聯。

1、low-level consumer

low-level consumer底層實現是

SimpleConsumer 他可以自行管理消費者

Storm的Kafka插件 storm-kafka就是使用了SimpleConsumer

優點是靈活 , 可以從任意位置拿消息 。

如果需要: 重覆讀取數據 只消費部分分區數據 精確消費 就得用這個,

不過必須自己處理位移提交 尋找分區leader broker 處理leader變更。

介面中的方法:
fetch
send  發送請求
getOffsetBefore
commitOffsets
fetchOffsets
earliestOrlatestOffset
close

使用步驟:

參照官網,比較複雜需要好幾步來拉取消息。

Find an active Broker and find out which Broker is the leader for your topic and partition

找到活躍的broker 找到哪個broker是你的topic和partition的leader

Determine who the replica Brokers are for your topic and partition

查出replica 的brokers

Build the request defining what data you are interested in

建立請求

Fetch the data

拿數據

Identify and recover from leader changes

leader變化時恢復

也可以查詢一些offset等metadata信息,具體代碼如下。

//根據指定的分區從主題元數據中找到主副本
SimpleConsumer consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
                        "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();

String  leader = metaData.leader().host();

//獲取分區的offset等信息
//比如獲取lastoffset
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); 

Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();  

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); 

kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);

OffsetResponse response = consumer.getOffsetsBefore(request);

long[] offsets = response.offsets(topic, partition);
long lastoffset = offsets[0];

這個api現在應用不多,除非你有特殊需求,比如要自己寫監控,你可能需要更多的元數據信息。

2、high-level consumer

主要使用的類:ConsumerConnector

屏蔽了每個topic的每個Partition的offset的管理(自動讀取zookeeper中該Consumer group的last offset)

Broker失敗轉移,增減Partition Consumer時的負載均衡(當Partiotion和Consumer增減時,Kafka自動負載均衡)

這些功能low-level consumer都需要自己實現的。

主要方法如下:
createMessageStreams
createMessageStreamsByFilter
commitOffsets
setconsumerReblanceListener
shutdown

group通過zookeeper完成核心功能,

zookeeper目錄結構如下:

/consumers/groupId/ids/consumre.id

記錄該consumer的訂閱信息,還被用來監聽consumer存活狀態。這是一個臨時節點,會話失效將會自動刪除。

/consumers/groupId/owners/topic/partition

保存consumer各個消費線程的id,執行rebalance時保存。

/consumers/groupId/offsets/topic/partition

保存該group消費指定分區的位移信息。

這個consumer支持多線程設計,只創建一個consumer實例,但如果是多個分區,將會自動創建多個線程消費。

使用步驟:

   Properties properties = new Properties();  
   properties.put("zookeeper.connect", "ip1:2181,ip2:2181,ip3:2181");//聲明zk  
   properties.put("group.id", "group03");
   ConsumerConnector  consumer =  Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); 
   
   Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
   topicCountMap.put(topic, 1); // 一次從主題中獲取一個數據  
   Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
   KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 獲取每次接收到的這個數據  如果是多線程在這裡處理多分區的情況
   ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
   while(iterator.hasNext()){  
        String message = new String(iterator.next().message());  
        System.out.println("接收到: " + message);  
   }  

//auto.offset.reset 預設值為largest
//從頭消費 properties.put("auto.offset.reset", "smallest"); 

很簡單,我們0.9版本之前使用的很多都是他,集成spring的方法等等。不過0.9版本以後新的consumer出現了。

二、新版本consumer

先說一下版本的問題:

Kafka 0.10.0.0之後 增加了 Kafka Streams 所以Kafka1.0開始Streams 就穩定了。

kafka security 0.9.0.0以後 0.10.0.1之後穩定

0.10.1.0之後 新版本consumer穩定

storm有兩個連kafka的包:

storm-kafka 使用了舊版本的consumer

storm-kafka-client 使用了新版本consumer

kafka 0.9.0.0廢棄了舊版producer和consumer 舊版時scala版 新版用java開發

版本 推薦producer 推薦consumer 原因
0.8.2.2 舊版 舊版 新producer尚不穩定
0.9.0.x 新版 舊版 新producer穩定
0.10.0.x 新版 舊版 新consumer不穩定
0.10.1.0 新版 新版 新consumer穩定
0.10.2.x 新版 新版 都穩定了

舊版本中offset管理依托zookeeper,新版本中不在依靠zookeeper。

語言 包名 主要使用類
舊版本 scala kafka.consumer.* ZookeeperConsumerConnector SimpleConsumer
新版本 java org.apache.kafka.clients.consumer.* KafkaConsumer

新版本的幾個核心概念:

consumer group

消費者使用一個消費者組名(group.id)來標記自己,topic的每條消息都只會發送到每個訂閱他的消費者組的一個消費者實例上。

1、一個消費者組有若幹個消費者。

2、對於同一個group,topic的每條消息只能被髮送到group下的一個consumer實例上。

3、topic消息可以被髮送到多個group中。

consumer端offset

記錄每一個consumer消費的分區的位置

kafka沒有把這個放在伺服器端,保存在了consumer group中,並定期持久化。

舊版本會把這個offset定期存在zookeeper中:路徑是 /consumers/groupid/offsets/topic/partitionid

新版本將offset放在了一個內部topic中:__consumer_offsets(前面兩個下劃線) 裡面有50個分區

所以新版本的consumer就不需要連zookeeper了。

舊版本設置offsets.storage=kafka設置位移提交到這,不常使用。

__consumer_offsets中的結構: key = group.id+topic+partition value=offset

consumer group reblance

單個consumer是沒有rebalance的。

他規定了一個consumer group下的所有consumer如何去分配所有的分區。

單線程示例代碼:
Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        props.put("auto.offset.reset","earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

很簡單,1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);

2、用這些Properties構建consumer對象(KafkaConsumer還有其他構造,可以把序列化傳進去);

3、subscribe訂閱topic列表(可以用正則訂閱Pattern.compile("kafka.*")

使用正則必須指定一個listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重寫這個介面來實現 分區變更時的邏輯。如果設置了enable.auto.commit = true 就不用理會這個邏輯。

4、然後迴圈poll消息(這裡的1000是超時設定,如果沒有很多數據,也就等一秒);

5、處理消息(列印了offset key value 這裡寫處理邏輯)。

6、關閉KafkaConsumer(可以傳一個timeout值 等待秒數 預設是30)。

Properties詳解:

bootstrap.server(最好用主機名不用ip kafka內部用的主機名 除非自己配置了ip)

deserializer 反序列化consumer從broker端獲取的是位元組數組,還原回對象類型。

預設有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定義:定義serializer格式 創建自定義deserializer類實現Deserializer 介面 重寫邏輯

除了四個必傳的 bootstrap.server group.id key.deserializer value.deserializer

還有session.timeout.ms "coordinator檢測失敗的時間"

是檢測consumer掛掉的時間 為了可以及時的rebalance 預設是10秒 可以設置更小的值避免消息延遲。

max.poll.interval.ms "consumer處理邏輯最大時間"

處理邏輯比較複雜的時候 可以設置這個值 避免造成不必要的 rebalance ,因為兩次poll時間超過了這個參數,kafka認為這個consumer已經跟不上了,會踢出組,而且不能提交offset,就會重覆消費。預設是5分鐘。

auto.offset.reset "無位移或者位移越界時kafka的應對策略"

所以如果啟動了一個group從頭消費 成功提交位移後 重啟後還是接著消費 這個參數無效

所以3個值的解釋是:

earliset 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從最早的位移消費

latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 none topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

(註意kafka-0.10.1.X版本之前: auto.offset.reset 的值為smallest,和,largest.(offest保存在zk中) 、

我們這是說的是新版本:kafka-0.10.1.X版本之後: auto.offset.reset 的值更改為:earliest,latest,和none (offest保存在kafka的一個特殊的topic名為:__consumer_offsets裡面))

enable.auto.commit 是否自動提交位移

true 自動提交 false需要用戶手動提交 有隻處理一次需要的 最近設置為false自己控制。

fetch.max.bytes consumer單次獲取最大位元組數

max.poll.records 單次poll返回的最大消息數

預設500條 如果消費很輕量 可以適當提高這個值 增加消費速度。

hearbeat.interval.ms consumer其他組員感知rabalance的時間

該值必須小於 session.timeout.ms 如果檢測到 consumer掛掉 也就根本無法感知rabalance了

connections.max.idle.ms 定期關閉連接的時間

預設是9分鐘 可以設置為-1 永不關閉

poll方法詳解:

(舊版本:多分區多線程 新版本:一個線程管理多個socket連接)

但新版本KafkaConsumer是雙線程的,主線程負責:消息獲取,rebalance,coordinator,位移提交等等,

另一個是後臺心跳線程。

根據上邊的各種配置,poll方法會找到offset,當獲取了足夠多的可用數據,或者等待時間超過了指定的超時時間,就會返回。

java consumer不是線程安全的,同一個KafkaConsumer用在了多個線程中,將會報Kafka Consumer is not safe for multi-threaded assess異常。可以加一個同步鎖進行保護。

poll的超時參數,已經說過1000的話是超時設定,如果沒有很多數據,也就等一秒,就返回了,比如定時5秒的將消息寫入,就可以將超時參數設置為5000,達到效率最大化。

如果沒有定時任務呢,那就設置為 Long.MAX_VALUE 未獲取足夠多的數據就無限等待。這裡要捕獲一下WakeupException。

consumer offset詳解:

consumer需要定期向kafka提交自己的offset信息。已經學過 新版本將他提交到了一個topic中 __consumer_offsets。

offset有一個更大的作用是實現交付語義:

最多一次 at most once 可能丟失 不會重覆

最少一次 at least once 可能重覆 不會丟失

精確一次 exactly once 不丟失 不重覆 就一次

若consumer在消費之前提交位移 就實現了at most once

若是消費後提交 就實現了 at least once 預設是這個。

consumer的多個位置信息:

​ 上次提交的位置 當前位置 水位 日誌最新位移

0 1 。。 5 。。 10 。。 15

上次提交位置:consumer最近一次提交的offset值;

當前位置:consumer上次poll 到了這個位置 但是還沒提交;

水位:這是分區日誌的管理 consumer無法讀取水位以上的消息;

最新位移: 也是分區日誌的管理 最大的位移值 一定不會比水位小。

新版本的consumer會在broker選一個broker作為consumergroup的coordinator,用於實現組成員管理,消費分配方案,提交位移。如果consumer崩潰,他負責的分區就分配給其他consumer,如果沒有做好位移提交就可能重覆消費。

多次提交的情況,kafka只關註最新一次的提交。

預設consumer自動提交位移 提交間隔為5秒 可以通過 auto.commit.interval.ms 設置這個間隔。

自動提交可以減少開發,但是可能重覆消費,所以需要精準消費時還是要手動提交。設置手動提交 enable.auto.commit = false,然後調用 consumer.commitSync() 或者 consumer.commitAync() Sync為同步方式,阻塞 Aync為非同步方式,不會阻塞。這兩個方法可以傳參,指定為哪個分區提交,這樣更合理一些。

(舊版本的自動提交設置是 auto.commit.enable 預設間隔為60秒)

rebalance詳解:

rebalance是consumer group如何分配topic的所有分區。

正常情況,比如有10個分區,5個consumer 那麼consumer group將為每個consumer 平均分配兩個分區。

每個分區只會分給一個consumer實例。有consumer出現問題,會重新執行這個過程,這個過程就是rebalance。

(舊版本通過zookeeper管理rebalance,新版本會選取某個broker為group coordinator來管理)

rebalance的觸發條件:

1、有新的consumer加入,或者有consumer離開或者掛掉。

2、group訂閱的topic發生變更,比如正則訂閱。

3、group訂閱的分區數發生變化。

第一個經常出現,不一定是掛掉,也可能是處理太慢,為了避免頻繁rebalance,要調整好request.timeout.ms max.poll.records和ma.poll.interval.

rebalance分區策略:

partition.assignment.strategy 設置 自定義分區策略-創建分區器 assignor

range策略(預設),將分區劃分為分區段,一次分配給每個consumer。

round-robin策略,輪詢分配。

sticky策略(0.11.0.0出現,更優秀),range策略在訂閱多個topic時會不均勻。

sticky有兩個原則,當兩者發生衝突時,第一個目標優先於第二個目標。

  1. 分區的分配要儘可能的均勻;
  2. 分區的分配儘可能的與上次分配的保持相同。

rebalance generation分代機制保證rabalance時重覆提交的問題,延遲的offset提交時舊的generation信息會報異常ILLEGAL_GENERATION

rebalance過程:

1、確定coordinator所在的broker,建立socket連接。

確定演算法: Math.abs(groupID.hashCode) % offsets.topic.num.partition 參數值(預設50)

尋找__consumer_offset分區50的leader副本所在的broker,該broker即為這個group的coordinator

2、加入組

所有consumer會向coordinator發送JoinGroup請求,收到所有請求後選一個consumer做leader(這個leader是consumer coordinator是broker),coordinator把成員和訂閱信息發給coordinator。

3、同步分配方案

leader制定分配方案,通過SyncGroup請求發給coordinator,每個consumer也會發請求返回方案。

kafka也支持offset不提交到__consumer_offset,可以自定義,這時候就需要實現一個監聽器ConsumerRebalanceListener,在這裡重新處理Rebalance的邏輯。

多線程示例代碼:
這裡要根據自身需求開發,我這裡只舉一個簡單的例子,就是幾個分區就啟動幾個consumer,一一對應。
三個類:
Main:
public static void main(String[] args) {
        
        String bootstrapServers = "kafka01:9092,kafka02:9092"; 
        String groupId = "test";
        String topic = "testtopic";
        int consumerNum = 3;
        ConsumerGroup cg = new ConsumerGroup(consumerNum,bootstrapServers,groupId,topic);
        cg.execute();
}



import java.util.ArrayList;
import java.util.List;


public class ConsumerGroup {
    
    private List<ConsumerRunnable> consumers;
    
    public ConsumerGroup(int consumerNum,String bootstrapServers,String groupId,String topic){
        
        consumers = new ArrayList<>(consumerNum);
        
        for(int i=0;i < consumerNum;i++){
            ConsumerRunnable ConsumerRunnable = new ConsumerRunnable(bootstrapServers,groupId,topic);
            consumers.add(ConsumerRunnable);
        }
    }
    
    public void execute(){
        
        for(ConsumerRunnable consumerRunnable:consumers){
            new Thread(consumerRunnable).start();
        }
    }
}



import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerRunnable implements Runnable{
    
    private final KafkaConsumer<String,String> consumer;
    
    public ConsumerRunnable(String bootstrapServers,String groupId,String topic){
        
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset","earliest");
        this.consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}
standalone consumer

有一些需求,需要指定一個消費者消費某一個分區。彼此之間不幹擾,一個standalone consumer崩潰不會影響其他。

類似舊版本的低階消費者。

示例代碼如下:consumer.assign方法訂閱分區

public static void main(String[] args) {
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        props.put("auto.offset.reset","earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        List<TopicPartition> partitions = new ArrayList<>();
        List<PartitionInfo> allpartitions = consumer.partitionsFor("testtopic");
        if(allpartitions!=null && !allpartitions.isEmpty()){
            for(PartitionInfo partitionInfo:allpartitions){
                partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
            }
            consumer.assign(partitions);
        }
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
        
    }

以上為kafka消費者的學習,不同的具體細節還需要通過官網文檔仔細學習。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.eDelivery中下載Oracle Database 19C和Oel的安裝介質,並安裝好操作系統2.安裝Oracle環境準備工具[root@localhost ~]# yum -y install oracle-database-preinstall-19cLoaded plugins: la... ...
  • 錯誤 TRIGGER **** 編譯錯誤 錯誤:PLS-00103: 出現符號 "END"在需要下列之一時: ( begin case declare exit for goto if loop mod null pragma raise return select update while wit ...
  • 1.背景 sysbench是一款壓力測試工具,可以測試系統的硬體性能,也可以用來對資料庫進行基準測試。sysbench 支持的測試有CPU運算性能測試、記憶體分配及傳輸速度測試、磁碟IO性能測試、POSIX線程性能測試、互斥性測試測試、資料庫性能測試(OLTP基準測試)。目前支持的資料庫主要是MySQ ...
  • 1.下載mysql5.7的rpm安裝包 rpm的mysql包,安裝起來簡單,解壓版的mysql還需要做許多配置,稍有不慎就會出錯!!! 下載地址:https://dev.mysql.com/downloads/mysql/5.7.html#downloads 下載後的安裝包是這個樣子的 2.上傳my ...
  • 用於Keys命令或match命令得到匹配的key時使用,註意不要與正則表達式混淆 語法:KEYS pattern / scan 0 match pattern count 10 說明:返回與指定模式相匹配的所用的keys。 該命令所支持的匹配模式如下: (1)?:用於匹配單個字元。例如,h?llo可 ...
  • 以sql server為例: 1、表值函數 用戶定義表值函數返回 table 數據類型,表是單個 SELECT 語句的結果集。 示例代碼CREATE FUNCTION Test_GetEmployeeSalary ( @EmployeeID VARCHAR(20) --參數)RETURNS TABL ...
  • 可以將以下代碼保存為backup.bat,添加計劃任務即可。 也可直接在cmd命令中複製單條語句執行,註意修改為自己的電腦路徑。 說明:--skip-lock-tables 如出現Can’t open file when using LOCK TABLES錯誤提示,可能是許可權不足導致,這裡我們在上述 ...
  • 1、創建臨時表的方法 方法一、select * into #臨時表名 from 你的表; 方法二、 create table #臨時表名(欄位1 約束條件,欄位2 約束條件,.....)create table ##臨時表名(欄位1 約束條件,欄位2 約束條件,.....) 註:以上的#代表局部臨時 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...