Hadoop學習(2)-java客戶端操作hdfs及secondarynode作用

来源:https://www.cnblogs.com/wpbing/archive/2019/07/23/11233399.html
-Advertisement-
Play Games

首先要在windows下解壓一個windows版本的hadoop 然後在配置他的環境變數,同時要把hadoop的share目錄下的hadoop下的相關jar包拷貝到esclipe 然後Build Path 下麵上代碼 練習:從一個文件裡面不斷地採集日誌上傳到hdfs裡面 1.流程介紹 啟動一個定時任 ...


首先要在windows下解壓一個windows版本的hadoop

然後在配置他的環境變數,同時要把hadoop的share目錄下的hadoop下的相關jar包拷貝到esclipe

然後Build Path

下麵上代碼

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Before;
import org.junit.Test;

public class HdfsClientDemo {
    
    
    public static void main(String[] args) throws Exception {
        /**
         * Configuration參數對象的機制:
         *    構造時,會載入jar包中的預設配置 xx-default.xml
         *    再載入 用戶配置xx-site.xml  ,覆蓋掉預設參數
         *    構造完成之後,還可以conf.set("p","v"),會再次覆蓋用戶配置文件中的參數值
         */
        // new Configuration()會從項目的classpath中載入core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等文件
        Configuration conf = new Configuration();
        
        // 指定本客戶端上傳文件到hdfs時需要保存的副本數為:2
        conf.set("dfs.replication", "2");
        // 指定本客戶端上傳文件到hdfs時切塊的規格大小:64M
        conf.set("dfs.blocksize", "64m");
        
        // 構造一個訪問指定HDFS系統的客戶端對象: 參數1:——HDFS系統的URI,參數2:——客戶端要特別指定的參數,參數3:客戶端的身份(用戶名)
        FileSystem fs = FileSystem.get(new URI("hdfs://172.31.2.38:9000/"), conf, "root");
        
        // 上傳一個文件到HDFS中
        fs.copyFromLocalFile(new Path("D:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new Path("/aaa/"));
        
        fs.close();
    }
    
    FileSystem fs = null;
    
    @Before
    public void init() throws Exception{
        Configuration conf = new Configuration();
        conf.set("dfs.replication", "2");
        conf.set("dfs.blocksize", "64m");
        
        fs = FileSystem.get(new URI("hdfs://172.31.2.38:9000/"), conf, "root");
        
    }
    
    
    /**
     * 從HDFS中下載文件到客戶端本地磁碟
     * @throws IOException 
     * @throws IllegalArgumentException 
     */
    @Test
    public void testGet() throws IllegalArgumentException, IOException{
        
        fs.copyToLocalFile(new Path("/test"), new Path("d:/"));
        fs.close();
        
    }
    
    
    /**
     * 在hdfs內部移動文件\修改名稱
     */
    @Test
    public void testRename() throws Exception{
        
        fs.rename(new Path("/install.log"), new Path("/aaa/in.log"));
        
        fs.close();
        
    }
    
    /**
     * 在hdfs中創建文件夾
     */
    @Test
    public void testMkdir() throws Exception{
        
        fs.mkdirs(new Path("/xx/yy/zz"));
        
        fs.close();
    }
    
    
    /**
     * 在hdfs中刪除文件或文件夾
     */
    @Test
    public void testRm() throws Exception{
        
        fs.delete(new Path("/aaa"), true);
        
        fs.close();
    }
    
    
    
    /**
     * 查詢hdfs指定目錄下的文件信息
     */
    @Test
    public void testLs() throws Exception{
        // 只查詢文件的信息,不返迴文件夾的信息
        RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true);
        
        while(iter.hasNext()){
            LocatedFileStatus status = iter.next();
            System.out.println("文件全路徑:"+status.getPath());
            System.out.println("塊大小:"+status.getBlockSize());
            System.out.println("文件長度:"+status.getLen());
            System.out.println("副本數量:"+status.getReplication());
            System.out.println("塊信息:"+Arrays.toString(status.getBlockLocations()));
            
            System.out.println("--------------------------------");
        }
        fs.close();
    }
   

    /**
     * 讀取hdfs中的文件的內容
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */
    @Test
    public void testReadData() throws IllegalArgumentException, IOException {

        FSDataInputStream in = fs.open(new Path("/test.txt"));

        BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));

        String line = null;
        while ((line = br.readLine()) != null) {
            System.out.println(line);
        }

        br.close();
        in.close();
        fs.close();

    }

    /**
     * 讀取hdfs中文件的指定偏移量範圍的內容
     * 
     * 
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */
    @Test
    public void testRandomReadData() throws IllegalArgumentException, IOException {

        FSDataInputStream in = fs.open(new Path("/xx.dat"));

        // 將讀取的起始位置進行指定
        in.seek(12);

        // 讀16個位元組
        byte[] buf = new byte[16];
        in.read(buf);

        System.out.println(new String(buf));

        in.close();
        fs.close();

    }

    /**
     * 往hdfs中的文件寫內容
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */

    @Test
    public void testWriteData() throws IllegalArgumentException, IOException {

        FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false);

        // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg

        FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");

        byte[] buf = new byte[1024];
        int read = 0;
        while ((read = in.read(buf)) != -1) {
            out.write(buf,0,read);
        }
        
        in.close();
        out.close();
        fs.close();

    }

}

 

練習:從一個文件裡面不斷地採集日誌上傳到hdfs裡面

1.流程介紹

---啟動一個定時任務

    --定時探測日誌原目錄

    --獲取文件上傳到一個待上傳的臨時目錄

    --逐一上傳到hdfs目標路徑,同時移動到備份目錄里

--啟動一個定時任務:

    --探測備份目錄中的備份數據是否已經超出,如果超出就刪除

 

 主類為:

import java.util.Timer;

public class DataCollectMain {
    
    public static void main(String[] args) {
        
        Timer timer = new Timer();
        //第一個為task類,第二個開始時間 第三個沒隔多久執行一次
        timer.schedule(new CollectTask(), 0, 60*60*1000L);
        
        timer.schedule(new BackupCleanTask(), 0, 60*60*1000L);
        
    }
    

}

CollectTask類:

這個類要繼承TimerTask,重寫run方法,主要內容就是不斷收集日誌文件

package cn.edu360.hdfs.datacollect;

import java.io.File;
import java.io.FilenameFilter;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.TimerTask;
import java.util.UUID;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

public class CollectTask extends TimerTask {

    @Override
    public void run() {
        try {
            // 獲取配置參數
            Properties props = PropertyHolderLazy.getProps();

            // 獲取本次採集時的日期
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
            String day = sdf.format(new Date());
            
            File srcDir = new File("d:/logs/accesslog");
            // 列出日誌源目錄中需要採集的文件
            //裡面傳了一個文件過濾器,重寫accept方法,return true就要
            File[] listFiles = srcDir.listFiles(new FilenameFilter() {
                @Override
                public boolean accept(File dir, String name) {
                    if (name.startsWith("access.log")) {
                        return true;
                    }
                    return false;
                }
            });
            // 將要採集的文件移動到待上傳臨時目錄
            File toUploadDir = new File("d:/logs/toupload");
            for (File file : listFiles) {
                
                //這裡如果是 file.renameTo(toUploadDir)是不對的,因為會生成一個toupload的文件而不是文件夾
                //要用renameTo的話你要自己加上文件的新名字比較麻煩
                //用FileUtiles是對file操作的一些工具類
                //第一個目標文件,第二個路徑,第三個是否存在覆蓋
                FileUtils.moveFileToDirectory(file, toUploadDir, true);
            }

            // 構造一個HDFS的客戶端對象
            FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), new Configuration(), "root");
            
            File[] toUploadFiles = toUploadDir.listFiles();

            // 檢查HDFS中的日期目錄是否存在,如果不存在,則創建
            Path hdfsDestPath = new Path("/logs" + day);
            if (!fs.exists(hdfsDestPath)) {
                fs.mkdirs(hdfsDestPath);
            }

            // 檢查本地的備份目錄是否存在,如果不存在,則創建
            File backupDir = new File("d:/logs/backup" + day + "/");
            if (!backupDir.exists()) {
                backupDir.mkdirs();
            }

            for (File file : toUploadFiles) {
                // 傳輸文件到HDFS並改名access_log_
                fs.copyFromLocalFile(new Path(file.getAbsolutePath()), new Path("/logs"+day+"/access_log_"+UUID.randomUUID()+".log"));

                // 將傳輸完成的文件移動到備份目錄
                //註意這裡依然不要用renameTo
                FileUtils.moveFileToDirectory(file, backupDir, true);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
/**
     * 讀取hdfs中的文件的內容
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */
    @Test
    public void testReadData() throws IllegalArgumentException, IOException {

        FSDataInputStream in = fs.open(new Path("/test.txt"));

        BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));

        String line = null;
        while ((line = br.readLine()) != null) {
            System.out.println(line);
        }

        br.close();
        in.close();
        fs.close();

    }

    /**
     * 讀取hdfs中文件的指定偏移量範圍的內容
     * 
     * 
     * 作業題:用本例中的知識,實現讀取一個文本文件中的指定BLOCK塊中的所有數據
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */
    @Test
    public void testRandomReadData() throws IllegalArgumentException, IOException {

        FSDataInputStream in = fs.open(new Path("/xx.dat"));

        // 將讀取的起始位置進行指定
        in.seek(12);

        // 讀16個位元組
        byte[] buf = new byte[16];
        in.read(buf);

        System.out.println(new String(buf));

        in.close();
        fs.close();

    }

    /**
     * 往hdfs中的文件寫內容
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */

    @Test
    public void testWriteData() throws IllegalArgumentException, IOException {

        FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false);

        // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg

        FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");

        byte[] buf = new byte[1024];
        int read = 0;
        while ((read = in.read(buf)) != -1) {
            out.write(buf,0,read);
        }
        
        in.close();
        out.close();
        fs.close();

    }

 


}

BackupCleanTask類

package cn.edu360.hdfs.datacollect;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimerTask;

import org.apache.commons.io.FileUtils;

public class BackupCleanTask extends TimerTask {

    @Override
    public void run() {

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
        long now = new Date().getTime();
        try {
            // 探測本地備份目錄
            File backupBaseDir = new File("d:/logs/backup/");
            File[] dayBackDir = backupBaseDir.listFiles();

            // 判斷備份日期子目錄是否已超24小時
            for (File dir : dayBackDir) {
                long time = sdf.parse(dir.getName()).getTime();
                if(now-time>24*60*60*1000L){
                    FileUtils.deleteDirectory(dir);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

 

hdfs中namenode中儲存元數據(對數據的描述信息)是在記憶體中以樹的形式儲存的,並且每隔一段時間都會把這些元數據序列化到磁碟中。序列化的東西在磁碟中叫 fsimage文件。

元數據可能會很大很大,所以只能是定期的序列化

問題1:序列化的時候,發生了元數據的修改怎麼辦

答:namenode會把每次用戶的操作都記錄下來,記錄成日誌文件,存在edits日誌文件中

其中edits日誌文件也會像log4j滾動日誌文件一樣,當文件太大的時候會另起一個文件並改名字

問題2:當edits文件太多的時候,一次宕機也會花大量的時間從edits里恢復,怎麼辦

答:會定期吧edits文件重放fsimage文件,並記錄edits的編號,把那些重放過的日誌文件給刪除。這樣也相當於重新序列化了,

所以namenode並不會做這樣的事情,是由secondary node做的,他會定期吧namenode的fsimage文件和edits文件下載下來

並把fsimage文件反序列化,並且讀日誌文件更新元數據,然後序列化到磁碟,然後把他上傳給namenode。

這個機制叫做checkpoint機制

這裡secondarynode 相當一一個小秘書

 

 

 客戶端寫數據到hdfs的流程

 

 上面是建立響應的過程

然後  是傳遞文件block塊的過程

 

 客戶端從hdfs讀數據流程

 額外知識點

註意,在windows裡面不要寫有些路徑不要寫絕對路徑,因為程式放到linux下麵可能會找不到,因此報錯

 一般使用class載入器,這樣當這個class載入的時候就會知道這個class在哪

類載入器的一些使用例子

比如我載入一個配置文件,為了避免出現絕對路徑,我們可以是用類載入器

     Properties props = new Properties();
        //載入配置文件,這樣寫的目的是為了避免在windows里出現絕對路徑,用類載入器,再把文件傳化成流
        props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream("job.properties"));

 

而對於一些功能性的類,我們最好在寫邏輯的時候也不要直接去導入這個包,而是使用Class.forName

//這樣不直接導入這個包,直接用類載入器,是面向介面編程的一種思想。這裡我並不是在開始import xxxx.Mapper,這裡Mapper是一個介面,這裡我用了多態
        Class<?> mapper_class = Class.forName(props.getProperty("MAPPER_CLASS"));
        Mapper mapper = (Mapper) mapper_class.newInstance();

 

單例模式

https://www.cnblogs.com/crazy-wang-android/p/9054771.html

只有個一實例,必須自己創建自己這個實例,必須為別人提供這個實例

 

 

 餓漢式單例:就算沒有人調用這個class,他也會載入進去;

如對於一個配置文件的載入

import java.util.Properties;

/**
 * 單例設計模式,方式一: 餓漢式單例
 *
 */
public class PropertyHolderHungery {

    private static Properties prop = new Properties();

    static {
        try {
            //將一個文件prop.load(stram)  
            //這裡面如果傳一個IO流不好,因為要用到絕對路徑,使用了類載入器  這種不管有沒有使用這個類都會載入
            prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream("collect.properties"));
        } catch (Exception e) {

        }
    }
    public static Properties getProps() throws Exception {
        return prop;
    }

}

 懶漢式:只有調用的時候才會有,但會有線程安全問題

/**
 * 單例模式:懶漢式——考慮了線程安全
 * */

public class PropertyHolderLazy {

    private static Properties prop = null;

    public static Properties getProps() throws Exception {
        if (prop == null) {
            synchronized (PropertyHolderLazy.class) {
                if (prop == null) {
                    prop = new Properties();
                    prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties"));
                }
            }
        }
        return prop;
    }

}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 如何選擇適合自己的Linux版本: 1.Linux桌面系統,首選Ubuntu; 2.伺服器端的Linux系統,首選RHEL或CentOS,這兩種中首選CentOS,如果公司有錢,不在乎成本也可以選擇RHEL; 3.如果對安全要求很高,可選擇Debian或FreeBSD; 4.如果需要使用資料庫高級服 ...
  • [toc] 進程的優先順序 那麼在系統中如何給進程配置優先順序 在啟動進程時,為不同的進程使用不同的調度策略。 nice值越高:表示優先順序越低,例如19,該進程容易將CPU使用量讓給其他進程。 nice值越低:表示優先順序越高,例如 20,該進程更不傾向於讓出CPU。 遠程連接不上操作步驟 網路 埠 用 ...
  • 轉自:https://blog.csdn.net/maxsky/article/details/44905003 大家都知道在 Linux 下,執行 su 命令後輸入密碼即可切換到 root 用戶執行各類操作 但是 Mac 下,這樣行不通,只會返回你一句 Sorry! 解決方法很簡單,首先打開終端 ...
  • 1.查找/var目錄下不屬於root、lp、gdm的所有文件。find /var -not \ (-user root -o -user lp -user gdm) -ls 2.統計/etc/init.d/functions文件中的每個單詞出現次數,併排序,用grep和sed兩種方法實現。 sed方 ...
  • #!/bin/bash #History: #2019/07/23 Fsq #This Program will check Permissions on dir PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/ ...
  • MySQL的sql_mode合理設置 sql_mode是個很容易被忽視的變數,預設值是空值,在這種設置下是可以允許一些非法操作的,比如允許一些非法數據的插入。在生產環境必須將這個值設置為嚴格模式,所以開發、測試環境的資料庫也必須要設置,這樣在開發測試階段就可以發現問題. sql model 常用來解 ...
  • MySQL必知必會 ​ 瞭解SQL 什麼是資料庫: 資料庫(database)保存有阻止的數據的容器,可以把資料庫想象成一個文件櫃。 什麼是表: 表(table) 某種特定類型結構的結構化清單,資料庫中的表的名字是唯一的。 什麼是列: 列(column)表中的一個欄位。所有表都是有一個或多個列組成的 ...
  • [toc] 背景 測試mysql5.7和mysql8.0 分別在讀寫、只讀、只寫模式下不同併發時的性能(tps,qps) 前提 測試使用版本為mysql5.7.22和mysql8.0.15 sysbench測試前先重啟mysql服務,並清空os的cache(避免多次測試時命中緩存) 每次進行測試都是 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...