Flume+Kafka+Storm整合

来源:https://www.cnblogs.com/hongten/archive/2018/12/18/hongten_flume_kafka_storm.html
-Advertisement-
Play Games

Flume+Kafka+Storm整合 1. 需求: 有一個客戶端Client可以產生日誌信息,我們需要通過Flume獲取日誌信息,再把該日誌信息放入到Kafka的一個Topic:flume-to-kafka ...


Flume+Kafka+Storm整合

1. 需求:

有一個客戶端Client可以產生日誌信息,我們需要通過Flume獲取日誌信息,再把該日誌信息放入到Kafka的一個Topic:flume-to-kafka

再由Storm讀取該topic:flume-to-kafka,進行日誌分析處理(這裡我們做的邏輯處理為filter,即過濾日誌信息),處理完日誌信息後,再由Storm把處理好的日誌信息放入到Kafka的另一個topic:storm-to-kafka

2.組件分佈情況

我總共搭建了3個節點node1,node2,node3

Zookeeper安裝在node1,node2,nod3

Flume安裝在node2

Kafka安裝在node1,node2,node3

Storm安裝在node1,node2,node3

 

3.JDK安裝

--在node1, node2, node3上面安裝jdk
--install JDK   -- http://blog.51cto.com/vvxyz/1642258(LInux安裝jdk的三種方法)
--解壓安裝
rpm -ivh your-package.rpm

--修改環境變數
vi /etc/profile

JAVA_HOME=/usr/java/jdk1.7.0_67
JRE_HOME=/usr/java/jdk1.7.0_67/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH

:wq

--使配置有效
source /etc/profile

 

4.Zookeeper的安裝

---============================
--解壓zookeeper壓縮包並安裝
tar -zxvf zookeeper-3.4.6.tar.gz 

--創建zookeeper的軟鏈
ln -sf /root/zookeeper-3.4.6 /home/zk

--配置zookeeper
cd /home/zk/conf/

--把下麵的zoo_sample.cfg文件重新命名
cp zoo_sample.cfg zoo.cfg

--修改zoo.cfg配置文件
vi zoo.cfg

--設置zookeeper的文件存放目錄
--找到dataDir=/tmp/zookeeper,並設置為下麵值
dataDir=/opt/zookeeper

--設置zookeeper集群
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888

:wq


--創建/opt/zookeeper目錄
mkdir /opt/zookeeper

--進入/opt/zookeeper目錄
cd /opt/zookeeper

--創建一個文件myid
vi myid

--輸入1
1
:wq
--以此類推,在node2,node3,值分別是2, 3

--拷貝zookeeper目錄到node2, node3的/opt/目錄下麵
cd ..
scp -r zookeeper/ root@node2:/opt/
scp -r zookeeper/ root@node3:/opt/

--分別進入到node2, node3裡面,修改/opt/zookeeper/myid,值分別是2, 3


--作為以上配置,把node1裡面的zookeeper拷貝到node2, node3上面。
scp -r zookeeper-3.4.6 root@node2:~/
scp -r zookeeper-3.4.6 root@node3:~/

--分別進入到node2, node3裡面,創建軟鏈
ln -sf /root/zookeeper-3.4.6/ /home/zk


--配置zookeeper環境變數
cd /home/zk/bin

--修改/etc/profile文件,把zookeeper的bin目錄路徑添加進去
vi /etc/profile
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:/home/zk/bin

--讓配置文件生效
source /etc/profile

--分別進入到node2, node3裡面,修改/etc/profile文件,把zookeeper的bin目錄路徑添加進去


--作為環境變數配置,就可以啟動zookeeper了。
--分別在node1, node2, node3上面啟動zookeeper
zkServer.sh start


--測試是否啟動成功
jps
--觀察是否有QuorumPeerMain進程

 

5.Flume的安裝

---------------------------------------------------
--安裝Flume
--把安裝包上傳到node2上面
cd
tar -zxvf apache-flume-1.6.0-bin.tar.gz 

--創建軟鏈
ln -s /root/apache-flume-1.6.0-bin /home/flume

--配置flume
cd /root/apache-flume-1.6.0-bin/conf

cp flume-env.sh.template flume-env.sh

vi flume-env.sh

--配置JDK
export JAVA_HOME=/usr/java/jdk1.7.0_67

:wq


--加入系統變數
vi /etc/profile

export FLUME_HOME=/root/apache-flume-1.6.0-bin
export PATH=$PATH:$FLUME_HOME/bin

:wq

source /etc/profile

--驗證是否安裝成功
flume-ng version

flume-ng version
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
From source with checksum b29e416802ce9ece3269d34233baf43f

 

6.Kafka的安裝

---------------------------------------------------
--kafka安裝
--在node1, node2, node3上面搭建kafka
--先進入node1
mkdir /root/kafka

--解壓
tar -zxvf kafka_2.10-0.8.2.2.tgz

--創建軟鏈
ln -s /root/kafka/kafka_2.10-0.8.2.2 /home/kafka-2.10

--配置
cd /root/kafka/kafka_2.10-0.8.2.2/config

vi server.properties

--node1=0, node2=1,node2=2
broker.id=0
log.dirs=/opt/kafka-log
zookeeper.connect=node1:2181,node2:2181,node3:2181

:wq


--為了啟動方便
cd /root/kafka/kafka_2.10-0.8.2.2
vi start-kafka.sh

nohup bin/kafka-server-start.sh  config/server.properties > kafka.log 2>&1 &

:wq

chmod 755 start-kafka.sh



--配置好以後
--分發到node2,node3
cd /root/kafka/
scp -r kafka_2.10-0.8.2.2/ root@node2:/root/kafka
scp -r kafka_2.10-0.8.2.2/ root@node3:/root/kafka

--進入到node2
cd /root/kafka/kafka_2.10-0.8.2.2/config

vi server.properties

--node1=0, node2=1,node2=2
broker.id=1

:wq


--進入到node3
cd /root/kafka/kafka_2.10-0.8.2.2/config

vi server.properties

--node1=0, node2=1,node2=2
broker.id=2

:wq

--啟動kafka
./zkServer.sh start

--分別進入node1,node2,node3
cd /root/kafka/kafka_2.10-0.8.2.2
./start-kafka.sh


--檢查是否啟動
jps
查看是否有Kafka進程

 

7.Storm的安裝

------------
--Storm分散式安裝
--部署到node1,node2,node3節點上
--進入node1
cd /root/apache-storm-0.10.0/conf

vi storm.yaml


--配置如下
# storm.zookeeper.servers:
     - "node1"
     - "node2"
     - "node3"
#  
nimbus.host: "node1"
storm.local.dir: "/opt/storm"
supervisor.slots.ports:
     - 6700
     - 6701
     - 6702
     - 6703

:wq

--從node1分發到node2,node3
scp -r apache-storm-0.10.0 root@node2:/
scp -r apache-storm-0.10.0 root@node3:/


--分別進入node2,node3創建軟鏈
ln -r /root/apache-storm-0.10.0 /home/storm-0.10


--分別進入node1,node2,node3快捷啟動
cd /root/apache-storm-0.10.0

vi start-storm.sh

nohup bin/storm nimbus >> logs/numbus.out 2>&1 &
nohup bin/storm supervisor >> logs/supervisor.out 2>&1 &


--node1上面配置,node2,node3上面不需要UI
nohup bin/storm ui >> logs/ui.out 2>&1 &
nohup bin/storm drpc >> logs/drpc.out 2>&1 &


:wq

--分別進入node1,node2,node3快捷stop-storm

vi stop-storm.sh

--node1上面配置,node2,node3上面不需要UI
kill -9 `jps | grep core | awk '{print $1}'`


kill -9 `jps | grep supervisor | awk '{print $1}'`
kill -9 `jps | grep nimbus | awk '{print $1}'`
kill -9 `jps | grep worker | awk '{print $1}'`
kill -9 `jps | grep LogWriter | awk '{print $1}'`

:wq


chmod 755 start-storm.sh
chmod 755 stop-storm.sh



--啟動Zookeeper服務
--在node1,node2,node3上面啟動
zkServer.sh start

--在node1,node2,node3上面啟動Storm
cd /root/apache-storm-0.10.0
./start-storm.sh


--上傳storm_wc.jar 文件
./storm jar /root/storm_wc.jar storm.wordcount.Test wordcount


------------
Storm DRPC 配置
--進入node1
cd /root/apache-storm-0.10.0/conf

vi storm.yaml

drpc.servers:     
     - "node1"

:wq

--從node1,分發到node2,node3
cd /root/apache-storm-0.10.0/conf/
scp -r root@node2:/root/apache-storm-0.10.0/conf
scp -r root@node3:/root/apache-storm-0.10.0/conf


--配置完,進入node1,node2,node3
cd /root/apache-storm-0.10.0
./start-storm.sh &

 

8.Flume+Kafka+Storm整合

8.1.配置Flume

--從node2
cd flumedir

vi flume_to_kafka

--node2配置如下
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 41414

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flume-to-kafka
a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

:wq

 

8.2.啟動Zookeeper

--啟動Zookeeper,在node1,node2,node3
--關閉防火牆
service iptables stop

--啟動Zookeeper
zkServer.sh start

 

8.3.啟動Kfaka

--啟動kafka
--分別進入node1,node2,node3
cd /root/kafka/kafka_2.10-0.8.2.2
./start-kafka.sh

 

8.4.啟動Flume

--進入node2,啟動
cd /root/flumedir

flume-ng agent -n a1 -c conf -f flume_to_kafka -Dflume.root.logger=DEBUG,console

 

8.4.啟動客戶端Client

啟動客戶端產生日誌信息。

大家可以參考RPC clients - Avro and Thrift的代碼

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

public class MyApp {
    public static void main(String[] args) {
        MyRpcClientFacade1 client = new MyRpcClientFacade1();
        // Initialize client with the remote Flume agent's host and port
        client.init("node2", 41414);

        // Send 10 events to the remote Flume agent. That agent should be
        // configured to listen with an AvroSource.
        String sampleData = "Hello ERROR ! ------ Test";
        for (int i = 500; i < 505; i++) {
            client.sendDataToFlume(sampleData + " " + i);
            System.out.println(sampleData + " " + i);
        }

        client.cleanUp();
    }
}

class MyRpcClientFacade1 {
    private RpcClient client;
    private String hostname;
    private int port;

    public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
        // Use the following method to create a thrift client (instead of the
        // above line):
        // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }

    public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

        // Send the event
        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            // clean up and recreate the client
            client.close();
            client = null;
            client = RpcClientFactory.getDefaultInstance(hostname, port);
            // Use the following method to create a thrift client (instead of
            // the above line):
            // this.client = RpcClientFactory.getThriftInstance(hostname, port);
        }
    }

    public void cleanUp() {
        // Close the RPC connection
        client.close();
    }
}

 

在eclipse控制台輸出的結果是:

[ WARN ] - [ org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:505) ] Invalid value for batchSize: 0; Using default value.
[ WARN ] - [ org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634) ] Using default maxIOWorkers
Hello ERROR ! ------ Test 500
Hello ERROR ! ------ Test 501
Hello ERROR ! ------ Test 502
Hello ERROR ! ------ Test 503
Hello ERROR ! ------ Test 504

 

8.5.查看Kafka的Topic

--進入node3,查看kafka的topic
cd /home/kafka-2.10/bin
./kafka-topics.sh --zookeeper node1,node2,node3 --list

 

可以看到,由於客戶端代碼的執行,Kafka裡面的topic:flume-to-kafka被自動創建

 

8.6.啟動Kafka Consumer:flume-to-kafka

我們在這裡是查看topic: flume-to-kafka的消費信息

--進入node3,啟動kafka消費者
cd /home/kafka-2.10/bin
./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic flume-to-kafka

 

控制台輸出:

Hello ERROR ! ------ Test 500
Hello ERROR ! ------ Test 501
Hello ERROR ! ------ Test 502
Hello ERROR ! ------ Test 503
Hello ERROR ! ------ Test 504

 

8.7.創建Topic:storm-to-kafka

在Kafka裡面創建另一個topic:

--進入node1,創建一個topic:storm-to-kafka
--設置3個partitions
--replication-factor=3
./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic storm-to-kafka --partitions 3 --replication-factor 3

 

8.8.運行Storm代碼

package storm.logfilter;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LogFilterTopology {

    public static class FilterBolt extends BaseBasicBolt {
        private static final long serialVersionUID = 1L;

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String line = tuple.getString(0);
            // 包含ERROR的行留下
            if (line.contains("ERROR")) {
                System.err.println("Filter:  " + line + " ~ filtered.");
                collector.emit(new Values(line + " ~ filtered."));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // 定義message提供給後面FieldNameBasedTupleToKafkaMapper使用
            declarer.declare(new Fields("message"));
        }
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        // https://github.com/apache/storm/tree/master/external/storm-kafka
        // config kafka spout,話題
        String topic = "flume-to-kafka";
        ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
        // /MyKafka,偏移量offset的根目錄,記錄隊列取到了哪裡
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");
        List<String> zkServers = new ArrayList<String>();
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }

        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = 2181;
        // 是否從頭開始消費
        spoutConfig.forceFromStart = true;
        spoutConfig.socketTimeoutMs = 60 * 1000;
        // StringScheme將位元組流轉解碼成某種編碼的字元串
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        // set kafka spout
        builder.setSpout("kafkaSpout", kafkaSpout, 3);

        // set bolt
        builder.setBolt("filterBolt", new FilterBolt(), 8).shuffleGrouping("kafkaSpout");

        // 數據寫出
        // set kafka bolt
        // withTopicSelector使用預設的選擇器指定寫入的topic: storm-to-kafka
        // withTupleToKafkaMapper tuple==>kafka的key和message
        KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("storm-to-kafka"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

        builder.setBolt("kafkaBolt", kafka_bolt, 2).shuffleGrouping("filterBolt");

        Config conf = new Config();
        // set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
        /**
         * Kafka生產者ACK機制 0 : 生產者不等待Kafka broker完成確認,繼續發送下一條數據 1 :
         * 生產者等待消息在leader接收成功確認之後,繼續發送下一條數據 -1 :
         * 生產者等待消息在follower副本接收到數據確認之後,繼續發送下一條數據
         */
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", props);

        conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
        
        if (args == null || args.length == 0) {
            // 本地方式運行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
        } else {
            // 集群方式運行
            conf.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
        
    }
}

 

8.9.啟動Kafka Consumer:storm-to-kafka

我們在這裡是查看topic: storm-to-kafka的消費信息

--進入node1,啟動kafka消費者
cd /home/kafka-2.10/bin
./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic storm-to-kafka

 

控制台輸出:

Hello ERROR ! ------ Test 504 ~ filtered.
Hello ERROR ! ------ Test 500 ~ filtered.
Hello ERROR ! ------ Test 501 ~ filtered.
Hello ERROR ! ------ Test 503 ~ filtered.
Hello ERROR ! ------ Test 502 ~ filtered.

 

========================================================

More reading,and english is important.

I'm Hongten

 

大哥哥大姐姐,覺得有用打賞點哦!你的支持是我最大的動力。謝謝。
Hongten博客排名在100名以內。粉絲過千。
Hongten出品,必是精品。

E | [email protected]  B | http://www.cnblogs.com/hongten

========================================================

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 0 */6 * * * /home/kdb/php/bin/php /home/kdb/apache/htdocs/lklkdbplatform/kdb_release/Crontab/index.php /TakeOut/update_token release 10.18.25.30 //每隔6 ...
  • 1.基本命令行模式 註意:非root用戶 systemctl restart crond 失效,請使用最底層的驅動重啟 2.crontab日誌 cron日誌保存在系統目錄/var/log/cron 命令:tail -n 2 /var/log/cron 註意:預設root許可權才可以 ...
  • 前言: 環境:centos7.5 64 位 正文: Docker 軟體包已經包括在預設的 CentOS Extras 軟體源里。因此想要安裝 docker,只需要運行下麵的 yum 命令: 啟動 docker 加速訪問 Docker Hub 搞定 ...
  • 由於公司停電,導致幾十臺vmWare虛擬機器啟動報錯。 錯誤:Failed to power on virtual machine XXX. Failed to lock the file Click here for more details. 有些場景也會出現下麵的錯誤:Failed to po ...
  • 這邊記錄一下如何在windows與linux之間進行文件的傳輸,下麵是具體的網址。 原文地址::http://blog.csdn.net/shufac/article/details/51966276 相關文章 1、Xshell實現Windows上傳文件到Linux主機 http://www.lin ...
  • 不瘦原來對redis也是有個大概的瞭解(就你知道的多), 但是最近和大神聊天的過程中才明白自己知道的簡直就是雞毛蒜皮(讓你得瑟),所以不瘦打算從頭在捋一遍,順便把過程也記錄下來,如果能給大家在學習redis的道路上提供一條清晰的線索,不瘦胖也瞑目了. 我們知道redis沒有直接使用C語言中的字元串, ...
  • 原創作品,轉自請註明出處:https://www.cnblogs.com/sunshine5683/p/10140716.html 繼上篇總結,繼續進行總結: 以下下數字函數: 1、abs(n):返回數字n的絕對值 2、acos(n):返回數字的反餘弦值 3、asin(n):返回數字的反正弦值 4、 ...
  • 四. 檢測下線狀態 對於Redis的Sentinel中關於下線有兩個不同的概念:(1)主觀下線(Subjectively Down, 簡稱 Sdown) 指的是單個 Sentinel 實例對伺服器做出的下線判斷,此時不會進行故障轉移。(2) 客觀下線(Objectively Down, 簡稱 Odo ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...