nginx lua集成kafka

来源:https://www.cnblogs.com/-xiaoyu-/archive/2019/08/03/11294905.html
-Advertisement-
Play Games

NGINX lua集成kafka === 第一步:進入opresty目錄 說明:接下來我們關註兩個目錄 "lualib" 和 "nginx" ​ 1.lualib: 是存放opresty所需要的集成軟體包的 ​ 2.nginx: 是nginx服務目錄 接下來,我們進入lualib目錄一看究竟: 這裡 ...


NGINX lua集成kafka

第一步:進入opresty目錄

[root@node03 openresty]# cd /export/servers/openresty/
[root@node03 openresty]# ll
total 356
drwxr-xr-x  2 root root   4096 Jul 26 11:33 bin
drwxrwxr-x 44 1000 1000   4096 Jul 26 11:31 build
drwxrwxr-x 43 1000 1000   4096 Nov 13  2017 bundle
-rwxrwxr-x  1 1000 1000  45908 Nov 13  2017 configure
-rw-rw-r--  1 1000 1000  22924 Nov 13  2017 COPYRIGHT
drwxr-xr-x  6 root root   4096 Jul 26 11:33 luajit
drwxr-xr-x  6 root root   4096 Aug  1 08:14 lualib
-rw-r--r--  1 root root   5413 Jul 26 11:32 Makefile
drwxr-xr-x 11 root root   4096 Jul 26 11:35 nginx
drwxrwxr-x  2 1000 1000   4096 Nov 13  2017 patches
drwxr-xr-x 44 root root   4096 Jul 26 11:33 pod
-rw-rw-r--  1 1000 1000   3689 Nov 13  2017 README.markdown
-rw-rw-r--  1 1000 1000   8690 Nov 13  2017 README-win32.txt
-rw-r--r--  1 root root 218352 Jul 26 11:33 resty.index
drwxr-xr-x  5 root root   4096 Jul 26 11:33 site
drwxr-xr-x  2 root root   4096 Aug  1 10:54 testlua
drwxrwxr-x  2 1000 1000   4096 Nov 13  2017 util
[root@node03 openresty]# 

說明:接下來我們關註兩個目錄lualibnginx

1.lualib: 是存放opresty所需要的集成軟體包的

2.nginx:是nginx服務目錄

接下來,我們進入lualib目錄一看究竟:

[root@node03 openresty]# cd lualib/
[root@node03 lualib]# ll
total 116
-rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.so
drwxr-xr-x 3 root root   4096 Jul 26 11:33 ngx
drwxr-xr-x 2 root root   4096 Jul 26 11:33 rds
drwxr-xr-x 2 root root   4096 Jul 26 11:33 redis
drwxr-xr-x 9 root root   4096 Aug  1 10:34 resty

這裡我們看到了redis和ngx集成軟體包,說明我們可以之間使用nginx和redis而無需導入任何依賴包!!!!

下麵看看resty裡面有些說明呢????

[root@node03 lualib]# cd resty/
[root@node03 resty]# ll
total 152
-rw-r--r-- 1 root root  6409 Jul 26 11:33 aes.lua
drwxr-xr-x 2 root root  4096 Jul 26 11:33 core
-rw-r--r-- 1 root root   596 Jul 26 11:33 core.lua
drwxr-xr-x 2 root root  4096 Jul 26 11:33 dns
drwxr-xr-x 2 root root  4096 Aug  1 10:42 kafka   #這是我們自己導入的
drwxr-xr-x 2 root root  4096 Jul 26 11:33 limit
-rw-r--r-- 1 root root  4616 Jul 26 11:33 lock.lua
drwxr-xr-x 2 root root  4096 Jul 26 11:33 lrucache
-rw-r--r-- 1 root root  4620 Jul 26 11:33 lrucache.lua
-rw-r--r-- 1 root root  1211 Jul 26 11:33 md5.lua
-rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua
-rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua
-rw-r--r-- 1 root root   616 Jul 26 11:33 random.lua
-rw-r--r-- 1 root root  9227 Jul 26 11:33 redis.lua
-rw-r--r-- 1 root root  1192 Jul 26 11:33 sha1.lua
-rw-r--r-- 1 root root  1045 Jul 26 11:33 sha224.lua
-rw-r--r-- 1 root root  1221 Jul 26 11:33 sha256.lua
-rw-r--r-- 1 root root  1045 Jul 26 11:33 sha384.lua
-rw-r--r-- 1 root root  1359 Jul 26 11:33 sha512.lua
-rw-r--r-- 1 root root   236 Jul 26 11:33 sha.lua
-rw-r--r-- 1 root root   698 Jul 26 11:33 string.lua
-rw-r--r-- 1 root root  5178 Jul 26 11:33 upload.lua
drwxr-xr-x 2 root root  4096 Jul 26 11:33 upstream
drwxr-xr-x 2 root root  406 Jul 26 11:33 websocket

這裡我們看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管

註意:這裡的 kafka這個包是沒有的,說明opnresty麽有集成kafka。此處我已經提前導入啦kafka集成包

我們看看kafka裡面多有哪些包:

[root@node03 resty]# cd kafka
[root@node03 kafka]# ll
total 48
-rw-r--r-- 1 root root  1369 Aug  1 10:42 broker.lua
-rw-r--r-- 1 root root  5537 Aug  1 10:42 client.lua
-rw-r--r-- 1 root root   710 Aug  1 10:42 errors.lua
-rw-r--r-- 1 root root 10718 Aug  1 10:42 producer.lua
-rw-r--r-- 1 root root  4072 Aug  1 10:42 request.lua
-rw-r--r-- 1 root root  2118 Aug  1 10:42 response.lua
-rw-r--r-- 1 root root  1494 Aug  1 10:42 ringbuffer.lua
-rw-r--r-- 1 root root  4845 Aug  1 10:42 sendbuffer.lua

附上kafka集成包:

鏈接:https://pan.baidu.com/s/1pFLhz3E_txb3ZWIRWxfQYg
提取碼:0umg

第二步:創建kafka測試lua文件

1.退回到openresty

[root@node03 kafka]# cd /export/servers/openresty/

2.創建測試文件

[root@node03 openresty]# mkdir -r testlua
#這裡文件名自己取,文件位置自己定,但必須找得到

這裡文件名自己取,文件位置自己定,但必須找得到!!!!!!!!!!!下麵會用到!!!!!!!!!!

3.進入剛剛創建的文件夾並創建kafkalua.lua腳本文件

創建文件:vim kafkalua.lua或者touch kafkalua.lua

[root@node03 openresty]# cd testlua/
[root@node03 testlua]# ll
total 8
-rw-r--r-- 1 root root 3288 Aug  1 10:54 kafkalua.lua

kafkalua.lua:

--測試語句可以不用
ngx.say('hello kafka file configuration successful!!!!!!')

--數據採集閾值限制,如果lua採集超過閾值,則不採集
local DEFAULT_THRESHOLD = 100000
-- kafka分區數
local PARTITION_NUM = 6
-- kafka主題名稱
local TOPIC = 'B2CDATA_COLLECTION1'
-- 輪詢器共用變數KEY值
local POLLING_KEY = "POLLING_KEY"
-- kafka集群(定義kafka broker地址,ip需要和kafka的host.name配置一致)
local function partitioner(key, num, correlation_id)
    return tonumber(key)
end
--kafka broker列表
local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}
--kafka參數,
local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }
-- 共用記憶體計數器,用於kafka輪詢使用
local shared_data = ngx.shared.shared_data
local pollingVal = shared_data:get(POLLING_KEY)
if not pollingVal then
    pollingVal = 1
    shared_data:set(POLLING_KEY, pollingVal)
end
--獲取每一條消息的計數器,對PARTITION_NUM取餘數,均衡分區
local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM)
shared_data:incr(POLLING_KEY, 1)

-- 併發控制
local isGone = true
--獲取ngx.var.connections_active進行過載保護,即如果當前活躍連接數超過閾值進行限流保護
if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then
    isGone = false
end
-- 數據採集
if isGone then

    local time_local = ngx.var.time_local
    if time_local == nil then
        time_local = ""
    end

    local request = ngx.var.request
    if request == nil then
        request = ""
    end

    local request_method = ngx.var.request_method
    if request_method == nil then
        request_method = ""
    end

    local content_type = ngx.var.content_type
    if content_type == nil then
        content_type = ""
    end
    ngx.req.read_body()
    local request_body = ngx.var.request_body
    if request_body == nil then
        request_body = ""
    end

    local http_referer = ngx.var.http_referer
    if http_referer == nil then
        http_referer = ""
    end

    local remote_addr = ngx.var.remote_addr
    if remote_addr == nil then
        remote_addr = ""
    end

    local http_user_agent = ngx.var.http_user_agent
    if http_user_agent == nil then
        http_user_agent = ""
    end

    local time_iso8601 = ngx.var.time_iso8601
    if time_iso8601 == nil then
        time_iso8601 = ""
    end

    local server_addr = ngx.var.server_addr
    if server_addr == nil then
        server_addr = ""
    end

    local http_cookie = ngx.var.http_cookie
    if http_cookie == nil then
        http_cookie = ""
    end
--封裝數據
    local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;
--引入kafka的producer
local producer = require "resty.kafka.producer"
--創建producer
local bp = producer:new(BROKER_LIST, CONNECT_PARAMS)
--發送數據
local ok, err = bp:send(TOPIC, partitions, message)
--列印錯誤日誌
    if not ok then
        ngx.log(ngx.ERR, "kafka send err:", err)
        return
    end
end

第三步:修改nginx配置文件nginx.conf

1.進入ngin/conf目錄

[root@node03 openresty]# cd /export/servers/openresty/nginx/conf/
[root@node03 conf]# ll
total 76
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default
-rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf
-rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default
-rw-r--r-- 1 root root 3191 Aug  1 10:52 nginx.conf
-rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default
-rw-r--r-- 1 root root  636 Jul 26 11:33 scgi_params
-rw-r--r-- 1 root root  636 Jul 26 11:33 scgi_params.default
-rw-r--r-- 1 root root  664 Jul 26 11:33 uwsgi_params
-rw-r--r-- 1 root root  664 Jul 26 11:33 uwsgi_params.default
-rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf

2.修改nginx.conf

[root@node03 conf]# vim nginx.conf

        #1.說明找到第一個server
        #2.在server上面添加兩行代碼如下
        #3.在server裡面添加kafka相關的代碼如下
        
        
#------------------添加的代碼---------------------------------------
 #開啟共用字典,設置記憶體大小為10M,供每個nginx的線程消費
 lua_shared_dict shared_data 10m;
 #配置本地功能變數名稱解析
 resolver 127.0.0.1;
#------------------添加的代碼---------------------------------------

 server {
        listen       80;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;
        location / {
            root   html;
            index  index.html index.htm;
        }

        #------------------添加的代碼---------------------------------------
        location /kafkalua {  #這裡的kafkalua就是工程名字,不加預設為空
            #開啟nginx監控
            stub_status on;
            #載入lua文件
            default_type text/html;
            #指定kafka的lua文件位置,就是我們剛纔創建的kafkalua.lua(前面已經強調要記住的!!!!)
            content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
        }
        #------------------添加的代碼---------------------------------------
}

說明:location /kafkalua{...}這裡的kafkalua是工程名,可以隨意取也可以不取,但是必須要記住!!!

看到我們上面配置了兩個location,第一個為location /{...}第二個為location /kafkalua{...}那麼他們有什麼區別呢???先向下看,迷霧將會慢慢揭開。

第四步:啟動nginx

1.進入nginx/sbin

[root@node03 sbin]# cd /export/servers/openresty/nginx/sbin/
[root@node03 sbin]# ll
total 16356
-rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx

2.測試配置文件是否正確

[root@node03 sbin]# nginx -t
nginx: the configuration file /export/servers/openresty/nginx/conf/nginx.conf syntax is ok
nginx: configuration file /export/servers/openresty/nginx/conf/nginx.conf test is successful
#看到已經成功啦

3.啟動nginx

[root@node03 sbin]# nginx
#不顯示任何東西一般是成功啦

4.查看nginx是否啟動成功

[root@node03 sbin]# ps -ef | grep nginx
root       3730      1  0 09:24 ?        00:00:00 nginx: master process nginx
nobody     3731   3730  0 09:24 ?        00:00:20 nginx: worker process is shutting down
nobody     5766   3730  0 12:17 ?        00:00:00 nginx: worker process
root       5824   3708  0 12:24 pts/1    00:00:00 grep nginx
#看到有兩個nginx進程,表示成功le

5.瀏覽器訪問nginx

在瀏覽器輸入:node03/kafkalua

說明:如何麽有配置hosts則輸入openresty所在設備的地址如:192.168.52.120/kafkalua

在瀏覽器輸入:node03/ 或者 192.168.52.120/

再在瀏覽器輸入:node03:80/kafkalua 和 node03:80/試試

搬來nginx.conf來看看:

node03:80/kafkalua這裡的nide03是伺服器的別名或者之間寫文伺服器地址,80是【listen 80;】配置的監聽埠,80埠可以省略不寫,如果這寫成【listen 8088;】那麼瀏覽器需輸入node03:8088/kafkalua(這裡不能省略8088),kafkalua是工程名。

 server {
        listen       80;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;
        location / {
            root   html;
            index  index.html index.htm;
        }

        #------------------添加的代碼---------------------------------------
        location /kafkalua {  #這裡的kafkalua就是工程名字,不加預設為空
            #開啟nginx監控
            stub_status on;
            #載入lua文件
            default_type text/html;
            #指定kafka的lua文件位置,就是我們剛纔創建的kafkalua.lua(前面已經強調要記住的!!!!)
            content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
        }

第五步:創建測試爬蟲程式

1.創建maven工程導入依賴

    <dependencies>
        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.11.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.4</version>
        </dependency>
    </dependencies>

2.偽爬蟲程式

public class SpiderGoAirCN {
    private static String basePath = "http://node03/kafkalua";
    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 50000; i++) {
            // 請求查詢信息
            spiderQueryao();
            // 請求html
            spiderHtml();
            // 請求js
            spiderJs();
            // 請求css
            spiderCss();
            // 請求png
            spiderPng();
            // 請求jpg
            spiderJpg();
            Thread.sleep(100);
        }
    }

    /**
     * 
     * @throws Exception
     */
    public static void spiderQueryao() throws Exception {
        // 1.指定目標網站      ^.*/B2C40/query/jaxb/direct/query.ao.*$
        String url = basePath + "/B2C40/query/jaxb/direct/query.ao";
        // 2.發起請求
        HttpPost httpPost = new HttpPost(url);
        // 3. 設置請求參數
        httpPost.setHeader("Time-Local", getLocalDateTime());
        httpPost.setHeader("Requst",
                    "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
        httpPost.setHeader("Request Method", "POST");
        httpPost.setHeader("Content-Type",
                "application/x-www-form-urlencoded; charset=UTF-8");
        httpPost.setHeader(
                "Referer",
                "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1="
                        + getGoTime() + "&at=1&ct=0&it=0");
        httpPost.setHeader("Remote Address", "192.168.56.80");
        httpPost.setHeader(
                "User-Agent",
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
        httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
        httpPost.setHeader("Server Address", "243.45.78.132");
        httpPost.setHeader(
                "Cookie",
                "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D"
                        + getGoTime()
                        + "%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1("
                        + getGoTime() + ")");
        // 4.設置請求參數
        ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
        parameters
                .add(new BasicNameValuePair(
                        "json",
                        "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
        httpPost.setEntity(new UrlEncodedFormEntity(parameters));
        // 5. 發起請求
        CloseableHttpClient httpClient = HttpClients.createDefault();
        CloseableHttpResponse response = httpClient.execute(httpPost);
        // 6.獲取返回值
        System.out.println(response != null);
    }

    public static void spiderHtml() throws Exception {
        // 1.指定目標網站         ^.*html.*$
        String url = basePath + "/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0";
        // 2.發起請求
        HttpPost httpPost = new HttpPost(url);
        // 3. 設置請求參數
        httpPost.setHeader("Time-Local", getLocalDateTime());
        httpPost.setHeader("Requst",
                "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
        httpPost.setHeader("Request Method", "POST");
        httpPost.setHeader("Content-Type",
                "application/x-www-form-urlencoded; charset=UTF-8");
        httpPost.setHeader(
                "Referer",
                "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
        httpPost.setHeader("Remote Address", "192.168.56.1");
        httpPost.setHeader(
                "User-Agent",
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
        httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
        httpPost.setHeader("Server Address", "192.168.56.80");
        httpPost.setHeader(
                "Cookie",
                "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
        // 4.設置請求參數
        // httpPost.setEntity(new StringEntity(
        // "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember="));
        ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
        parameters
                .add(new BasicNameValuePair(
                        "json",
                        "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
        httpPost.setEntity(new UrlEncodedFormEntity(parameters));
        // 5. 發起請求
        CloseableHttpClient httpClient = HttpClients.createDefault();
        CloseableHttpResponse response = httpClient.execute(httpPost);
        // 6.獲取返回值
        System.out.println(response != null);
    }

    public static void spiderJs() throws Exception {

        // 1.指定目標網站
        String url = basePath +"/B2C40/dist/main/modules/common/requireConfig.js";
        // 2.發起請求
        HttpPost httpPost = new HttpPost(url);
        // 3. 設置請求參數
        httpPost.setHeader("Time-Local", getLocalDateTime());
        httpPost.setHeader("Requst",
                "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
        httpPost.setHeader("Request Method", "POST");
        httpPost.setHeader("Content-Type",
                "application/x-www-form-urlencoded; charset=UTF-8");
        httpPost.setHeader(
                "Referer",
                "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
        httpPost.setHeader("Remote Address", "192.168.56.1");
        httpPost.setHeader(
                "User-Agent",
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
        httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
        httpPost.setHeader("Server Address", "192.168.56.80");
        httpPost.setHeader(
                "Cookie",
                "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
        // 4.設置請求參數
        ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
        parameters
                .add(new BasicNameValuePair(
                        "json",
                        "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
        httpPost.setEntity(new UrlEncodedFormEntity(parameters));
        // 5. 發起請求
        CloseableHttpClient httpClient = HttpClients.createDefault();
        CloseableHttpResponse response = httpClient.execute(httpPost);
        // 6.獲取返回值
        System.out.println(response != null);
    }

    public static void spiderCss() throws Exception {

        // 1.指定目標網站
        String url = basePath +"/B2C40/dist/main/css/flight.css";
        // 2.發起請求
        HttpPost httpPost = new HttpPost(url);
        // 3. 設置請求參數
        httpPost.setHeader("Time-Local", getLocalDateTime());
        httpPost.setHeader("Requst",
                "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
        httpPost.setHeader("Request Method", "POST");
        httpPost.setHeader("Content-Type",
                "application/x-www-form-urlencoded; charset=UTF-8");
        httpPost.setHeader("Referer",
                "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html");
        httpPost.setHeader("Remote Address", "192.168.56.1");
        httpPost.setHeader(
                "User-Agent",
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
        httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
        httpPost.setHeader("Server Address", "192.168.56.80");
        httpPost.setHeader(
                "Cookie",
                "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
        // 4.設置請求參數
        ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
        parameters
                .add(new BasicNameValuePair(
                        "json",
                        "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
        httpPost.setEntity(new UrlEncodedFormEntity(parameters));
        // 5. 發起請求
        CloseableHttpClient httpClient = HttpClients.createDefault();
        CloseableHttpResponse response = httpClient.execute(httpPost);
        // 6.獲取返回值
        System.out.println(response != null);
    }

    public static void spiderPng() throws Exception {

        // 1.指定目標網站
        String url =basePath + "/B2C40/dist/main/images/common.png";
        // 2.發起請求
        HttpPost httpPost = new HttpPost(url);
        // 3. 設置請求參數
        httpPost.setHeader("Time-Local", getLocalDateTime());
        httpPost.setHeader("Requst",
                "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
        httpPost.setHeader("Request Method", "POST");
        httpPost.setHeader("Content-Type",
                "application/x-www-form-urlencoded; charset=UTF-8");
        httpPost.setHeader(
                "Referer",
                "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
        httpPost.setHeader("Remote Address", "192.168.56.1");
        httpPost.setHeader(
                "User-Agent",
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
        httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
        httpPost.setHeader("Server Address", "192.168.56.80");
        httpPost.setHeader(
                "Cookie",
                "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
        // 4.設置請求參數
        ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
        parameters
                .add(new BasicNameValuePair(
                        "json",
                        "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
        httpPost.setEntity(new UrlEncodedFormEntity(parameters));
        // 5. 發起請求
        CloseableHttpClient httpClient = HttpClients.createDefault();
        CloseableHttpResponse response = httpClient.execute(httpPost);
        // 6.獲取返回值
        System.out.println(response != null);
    }

    public static void spiderJpg() throws Exception {

        // 1.指定目標網站
        String url = basePath +"/B2C40/dist/main/images/loadingimg.jpg";
        // 2.發起請求
        HttpPost httpPost = new HttpPost(url);
        // 3. 設置請求參數
        httpPost.setHeader("Time-Local", getLocalDateTime());
        httpPost.setHeader("Requst",
                "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
        httpPost.setHeader("Request Method", "POST");
        httpPost.setHeader("Content-Type",
                "application/x-www-form-urlencoded; charset=UTF-8");
        httpPost.setHeader(
                "Referer",
                "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
        httpPost.setHeader("Remote Address", "192.168.56.1");
        httpPost.setHeader(
                "User-Agent",
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
        httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
        httpPost.setHeader("Server Address", "192.168.56.80");
        httpPost.setHeader(
                "Cookie",
                "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
        // 4.設置請求參數
        ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
        parameters
                .add(new BasicNameValuePair(
                        "json",
                        "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
        httpPost.setEntity(new UrlEncodedFormEntity(parameters));
        // 5. 發起請求
        CloseableHttpClient httpClient = HttpClients.createDefault();
        CloseableHttpResponse response = httpClient.execute(httpPost);
        // 6.獲取返回值
        System.out.println(response != null);
    }

    public static String getLocalDateTime() {
        DateFormat df = new SimpleDateFormat("dd/MMM/yyyy'T'HH:mm:ss +08:00",
                Locale.ENGLISH);
        String nowAsISO = df.format(new Date());
        return nowAsISO;

    }

    public static String getISO8601Timestamp() {
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss+08:00");
        String nowAsISO = df.format(new Date());
        return nowAsISO;
    }

    public static String getGoTime() {
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
        String nowAsISO = df.format(new Date());
        return nowAsISO;
    }

    public static String getBackTime() {
        Date date = new Date();// 取時間
        Calendar calendar = new GregorianCalendar();
        calendar.setTime(date);
        calendar.add(calendar.DATE, +1);// 把日期往前減少一天,若想把日期向後推一天則將負數改為正數
        date = calendar.getTime();
        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
        String dateString = formatter.format(date);
        return dateString;
    }
}

第六步:啟動kafka

1.創建主題topic

[root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3 
--replication-factor 3 --create --topic B2CDATA_COLLECTION1

2.開啟kafka消費者

[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 
--topic B2CDATA_COLLECTION1

第七步:開啟爬蟲程式並觀察結果

1.啟動爬蟲程式

2.觀察消費者視窗如下

第八步:啟動kafka-manager觀察

1.啟動kafka-manager

[root@node01 conf]# cd /export/servers/kafka-manager-1.3.3.23/bin/
[root@node01 bin]# ll
total 36
-rwxr-xr-x 1 root root 13747 May  1 06:27 kafka-manager
-rw-r--r-- 1 root root  9975 May  1 06:27 kafka-manager.bat
-rwxr-xr-x 1 root root  1383 May  1 06:27 log-config
-rw-r--r-- 1 root root   105 May  1 06:27 log-config.bat
[root@node01 bin]# 

#啟動
[root@node01 bin]# ./kafka-manager 

啟動後的視窗:

2.瀏覽器訪問

瀏覽器輸入:node01:9000

kafka manager使用不做講解,觀察B2CDATA_COLLECTION1主題消費情況:

​ 有三個分區,每個分區消費的消息差多說明成功啦,

​ 如果不一樣,則是kafkalua.lua 腳本中沒有配置分區策略,預設分區會導致 數據傾斜 我們需配置自己的分區策略!

完畢!!!!!!!!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • CRT遠程連接centos7,連接超時 問題原因: 宿主機(win10)和虛擬機(centos7)不在同一個網段 在宿主機無法ping通虛擬機, 首先在cmd視窗ipconfig查看一下vmnet的ip地址 然後在centos使用命令ifconfig查看ip地址 現在宿主機和虛擬機在同一個網段,所以 ...
  • 本文是記錄一下學習docker的過程,希望可以幫助到入門的朋友。 系統:ubuntu16.04 docker:18.09 打開官網:https://docs.docker.com/install/linux/docker-ce/ubuntu/ OS requirements To install D ...
  • 1 拋棄舊文化,迎接Linux命令新文化 Linux第一步,從Windows思維,切換到Linux的“命令行+文件”模式 在Linux中,做什麼都有相應命令。一般就在bin或者sbin目錄下,數量繁多。如果你事先不知道該用哪個命令,很難通過枚舉的方式找到。因此,在這樣沒有統一入口的情況下,就需要你對 ...
  • 基於MySQL Router可以實現高可用,讀寫分離,負載均衡之類的,MySQL Router可以說是非常輕量級的一個中間件了。看了一下MySQL Router的原理,其實並不複雜,原理也並不難理解,其實就是一個類似於VIP的代理功能,其中一個MySQL Router有兩個埠號,分別是對讀和寫的轉 ...
  • 說明:本文主要詳細介紹了關於如何在阿裡雲ECS伺服器上安裝並配置Mysql 環境:Centos 7版本,阿裡雲部署好系統後會預設安裝mariadb資料庫 1、刪除阿裡雲自帶的MariaDB 2、下載與安裝Mysql (1)下載MySql官方的yum repository (2)下載並安裝rpm包 ( ...
  • 哨兵機制存在的意義: 為了實現redis故障轉移的自動化。自動發現,自動轉移。不需要人工參與。 用戶管理多個Redis伺服器,該系統執行三個任務: 監控:哨兵會不間斷的檢查Master和Slave是否正常運行 提醒:當被監控的某個Redis出現問題,哨兵通過API向管理員或者應用程式發送通知 自動故 ...
  • 一句話概括就是Sum(列) 是求和,把所有列的值進行彙總求和;COUNT(列) 是行數彙總,只要列的值不為Null,就會增加1; 舉個例子說明下: --創建臨時表結構 CREATE TABLE TempTB ( ID int , Name varchar(20), Price Int ) --寫入示 ...
  • 一、表空間的創建以及刪除 聲明:此操作環境為windows,oracle10G 二、用戶 1.用戶的創建、設置密碼、設置表空間 2、授權 3、刪除用戶 4.解鎖用戶 三、Oracle數據類型(簡單的) 四、簡單DDL(數據定義語言) 1.創建表 2.修改表結構 五、DML(數據操作語言) =》 增刪 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...