首先要在windows下解壓一個windows版本的hadoop 然後在配置他的環境變數,同時要把hadoop的share目錄下的hadoop下的相關jar包拷貝到esclipe 然後Build Path 下麵上代碼 練習:從一個文件裡面不斷地採集日誌上傳到hdfs裡面 1.流程介紹 啟動一個定時任 ...
首先要在windows下解壓一個windows版本的hadoop
然後在配置他的環境變數,同時要把hadoop的share目錄下的hadoop下的相關jar包拷貝到esclipe
然後Build Path
下麵上代碼
import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.junit.Before; import org.junit.Test; public class HdfsClientDemo { public static void main(String[] args) throws Exception { /** * Configuration參數對象的機制: * 構造時,會載入jar包中的預設配置 xx-default.xml * 再載入 用戶配置xx-site.xml ,覆蓋掉預設參數 * 構造完成之後,還可以conf.set("p","v"),會再次覆蓋用戶配置文件中的參數值 */ // new Configuration()會從項目的classpath中載入core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等文件 Configuration conf = new Configuration(); // 指定本客戶端上傳文件到hdfs時需要保存的副本數為:2 conf.set("dfs.replication", "2"); // 指定本客戶端上傳文件到hdfs時切塊的規格大小:64M conf.set("dfs.blocksize", "64m"); // 構造一個訪問指定HDFS系統的客戶端對象: 參數1:——HDFS系統的URI,參數2:——客戶端要特別指定的參數,參數3:客戶端的身份(用戶名) FileSystem fs = FileSystem.get(new URI("hdfs://172.31.2.38:9000/"), conf, "root"); // 上傳一個文件到HDFS中 fs.copyFromLocalFile(new Path("D:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new Path("/aaa/")); fs.close(); } FileSystem fs = null; @Before public void init() throws Exception{ Configuration conf = new Configuration(); conf.set("dfs.replication", "2"); conf.set("dfs.blocksize", "64m"); fs = FileSystem.get(new URI("hdfs://172.31.2.38:9000/"), conf, "root"); } /** * 從HDFS中下載文件到客戶端本地磁碟 * @throws IOException * @throws IllegalArgumentException */ @Test public void testGet() throws IllegalArgumentException, IOException{ fs.copyToLocalFile(new Path("/test"), new Path("d:/")); fs.close(); } /** * 在hdfs內部移動文件\修改名稱 */ @Test public void testRename() throws Exception{ fs.rename(new Path("/install.log"), new Path("/aaa/in.log")); fs.close(); } /** * 在hdfs中創建文件夾 */ @Test public void testMkdir() throws Exception{ fs.mkdirs(new Path("/xx/yy/zz")); fs.close(); } /** * 在hdfs中刪除文件或文件夾 */ @Test public void testRm() throws Exception{ fs.delete(new Path("/aaa"), true); fs.close(); } /** * 查詢hdfs指定目錄下的文件信息 */ @Test public void testLs() throws Exception{ // 只查詢文件的信息,不返迴文件夾的信息 RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true); while(iter.hasNext()){ LocatedFileStatus status = iter.next(); System.out.println("文件全路徑:"+status.getPath()); System.out.println("塊大小:"+status.getBlockSize()); System.out.println("文件長度:"+status.getLen()); System.out.println("副本數量:"+status.getReplication()); System.out.println("塊信息:"+Arrays.toString(status.getBlockLocations())); System.out.println("--------------------------------"); } fs.close(); } /** * 讀取hdfs中的文件的內容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/test.txt")); BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8")); String line = null; while ((line = br.readLine()) != null) { System.out.println(line); } br.close(); in.close(); fs.close(); } /** * 讀取hdfs中文件的指定偏移量範圍的內容 * * * * @throws IOException * @throws IllegalArgumentException */ @Test public void testRandomReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/xx.dat")); // 將讀取的起始位置進行指定 in.seek(12); // 讀16個位元組 byte[] buf = new byte[16]; in.read(buf); System.out.println(new String(buf)); in.close(); fs.close(); } /** * 往hdfs中的文件寫內容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testWriteData() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false); // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg"); byte[] buf = new byte[1024]; int read = 0; while ((read = in.read(buf)) != -1) { out.write(buf,0,read); } in.close(); out.close(); fs.close(); } }
練習:從一個文件裡面不斷地採集日誌上傳到hdfs裡面
1.流程介紹
---啟動一個定時任務
--定時探測日誌原目錄
--獲取文件上傳到一個待上傳的臨時目錄
--逐一上傳到hdfs目標路徑,同時移動到備份目錄里
--啟動一個定時任務:
--探測備份目錄中的備份數據是否已經超出,如果超出就刪除
主類為:
import java.util.Timer; public class DataCollectMain { public static void main(String[] args) { Timer timer = new Timer(); //第一個為task類,第二個開始時間 第三個沒隔多久執行一次 timer.schedule(new CollectTask(), 0, 60*60*1000L); timer.schedule(new BackupCleanTask(), 0, 60*60*1000L); } }
CollectTask類:
這個類要繼承TimerTask,重寫run方法,主要內容就是不斷收集日誌文件
package cn.edu360.hdfs.datacollect; import java.io.File; import java.io.FilenameFilter; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.TimerTask; import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; public class CollectTask extends TimerTask { @Override public void run() { try { // 獲取配置參數 Properties props = PropertyHolderLazy.getProps(); // 獲取本次採集時的日期 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); String day = sdf.format(new Date()); File srcDir = new File("d:/logs/accesslog"); // 列出日誌源目錄中需要採集的文件 //裡面傳了一個文件過濾器,重寫accept方法,return true就要 File[] listFiles = srcDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { if (name.startsWith("access.log")) { return true; } return false; } }); // 將要採集的文件移動到待上傳臨時目錄 File toUploadDir = new File("d:/logs/toupload"); for (File file : listFiles) { //這裡如果是 file.renameTo(toUploadDir)是不對的,因為會生成一個toupload的文件而不是文件夾 //要用renameTo的話你要自己加上文件的新名字比較麻煩 //用FileUtiles是對file操作的一些工具類 //第一個目標文件,第二個路徑,第三個是否存在覆蓋 FileUtils.moveFileToDirectory(file, toUploadDir, true); } // 構造一個HDFS的客戶端對象 FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), new Configuration(), "root"); File[] toUploadFiles = toUploadDir.listFiles(); // 檢查HDFS中的日期目錄是否存在,如果不存在,則創建 Path hdfsDestPath = new Path("/logs" + day); if (!fs.exists(hdfsDestPath)) { fs.mkdirs(hdfsDestPath); } // 檢查本地的備份目錄是否存在,如果不存在,則創建 File backupDir = new File("d:/logs/backup" + day + "/"); if (!backupDir.exists()) { backupDir.mkdirs(); } for (File file : toUploadFiles) { // 傳輸文件到HDFS並改名access_log_ fs.copyFromLocalFile(new Path(file.getAbsolutePath()), new Path("/logs"+day+"/access_log_"+UUID.randomUUID()+".log")); // 將傳輸完成的文件移動到備份目錄 //註意這裡依然不要用renameTo FileUtils.moveFileToDirectory(file, backupDir, true); } } catch (Exception e) { e.printStackTrace(); } }
/** * 讀取hdfs中的文件的內容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/test.txt")); BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8")); String line = null; while ((line = br.readLine()) != null) { System.out.println(line); } br.close(); in.close(); fs.close(); } /** * 讀取hdfs中文件的指定偏移量範圍的內容 * * * 作業題:用本例中的知識,實現讀取一個文本文件中的指定BLOCK塊中的所有數據 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testRandomReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/xx.dat")); // 將讀取的起始位置進行指定 in.seek(12); // 讀16個位元組 byte[] buf = new byte[16]; in.read(buf); System.out.println(new String(buf)); in.close(); fs.close(); } /** * 往hdfs中的文件寫內容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testWriteData() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false); // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg"); byte[] buf = new byte[1024]; int read = 0; while ((read = in.read(buf)) != -1) { out.write(buf,0,read); } in.close(); out.close(); fs.close(); }
}
BackupCleanTask類
package cn.edu360.hdfs.datacollect; import java.io.File; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimerTask; import org.apache.commons.io.FileUtils; public class BackupCleanTask extends TimerTask { @Override public void run() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); long now = new Date().getTime(); try { // 探測本地備份目錄 File backupBaseDir = new File("d:/logs/backup/"); File[] dayBackDir = backupBaseDir.listFiles(); // 判斷備份日期子目錄是否已超24小時 for (File dir : dayBackDir) { long time = sdf.parse(dir.getName()).getTime(); if(now-time>24*60*60*1000L){ FileUtils.deleteDirectory(dir); } } } catch (Exception e) { e.printStackTrace(); } } }
hdfs中namenode中儲存元數據(對數據的描述信息)是在記憶體中以樹的形式儲存的,並且每隔一段時間都會把這些元數據序列化到磁碟中。序列化的東西在磁碟中叫 fsimage文件。
元數據可能會很大很大,所以只能是定期的序列化
問題1:序列化的時候,發生了元數據的修改怎麼辦
答:namenode會把每次用戶的操作都記錄下來,記錄成日誌文件,存在edits日誌文件中
其中edits日誌文件也會像log4j滾動日誌文件一樣,當文件太大的時候會另起一個文件並改名字
問題2:當edits文件太多的時候,一次宕機也會花大量的時間從edits里恢復,怎麼辦
答:會定期吧edits文件重放fsimage文件,並記錄edits的編號,把那些重放過的日誌文件給刪除。這樣也相當於重新序列化了,
所以namenode並不會做這樣的事情,是由secondary node做的,他會定期吧namenode的fsimage文件和edits文件下載下來
並把fsimage文件反序列化,並且讀日誌文件更新元數據,然後序列化到磁碟,然後把他上傳給namenode。
這個機制叫做checkpoint機制
這裡secondarynode 相當一一個小秘書
客戶端寫數據到hdfs的流程
上面是建立響應的過程
然後 是傳遞文件block塊的過程
客戶端從hdfs讀數據流程
額外知識點
註意,在windows裡面不要寫有些路徑不要寫絕對路徑,因為程式放到linux下麵可能會找不到,因此報錯
一般使用class載入器,這樣當這個class載入的時候就會知道這個class在哪
類載入器的一些使用例子
比如我載入一個配置文件,為了避免出現絕對路徑,我們可以是用類載入器
Properties props = new Properties(); //載入配置文件,這樣寫的目的是為了避免在windows里出現絕對路徑,用類載入器,再把文件傳化成流 props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream("job.properties"));
而對於一些功能性的類,我們最好在寫邏輯的時候也不要直接去導入這個包,而是使用Class.forName
//這樣不直接導入這個包,直接用類載入器,是面向介面編程的一種思想。這裡我並不是在開始import xxxx.Mapper,這裡Mapper是一個介面,這裡我用了多態 Class<?> mapper_class = Class.forName(props.getProperty("MAPPER_CLASS")); Mapper mapper = (Mapper) mapper_class.newInstance();
單例模式
https://www.cnblogs.com/crazy-wang-android/p/9054771.html
只有個一實例,必須自己創建自己這個實例,必須為別人提供這個實例
餓漢式單例:就算沒有人調用這個class,他也會載入進去;
如對於一個配置文件的載入
import java.util.Properties; /** * 單例設計模式,方式一: 餓漢式單例 * */ public class PropertyHolderHungery { private static Properties prop = new Properties(); static { try { //將一個文件prop.load(stram) //這裡面如果傳一個IO流不好,因為要用到絕對路徑,使用了類載入器 這種不管有沒有使用這個類都會載入 prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream("collect.properties")); } catch (Exception e) { } } public static Properties getProps() throws Exception { return prop; } }
懶漢式:只有調用的時候才會有,但會有線程安全問題
/** * 單例模式:懶漢式——考慮了線程安全 * */ public class PropertyHolderLazy { private static Properties prop = null; public static Properties getProps() throws Exception { if (prop == null) { synchronized (PropertyHolderLazy.class) { if (prop == null) { prop = new Properties(); prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties")); } } } return prop; } }