HDFS常用API操作 和 HDFS的I/O流操作

来源:https://www.cnblogs.com/mcq1999/archive/2019/10/31/11769328.html
-Advertisement-
Play Games

前置操作 創建maven工程,修改pom.xml文件: 在resources添加一個file:log4j.properties: API操作 HDFS的命令和linux極其相似,可以類比記憶,在這裡列出一些java api操作: I/O流操作 上面的API操作 HDFS系統都是框架封裝好的,如果我們 ...


前置操作

創建maven工程,修改pom.xml文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mcq</groupId>
  <artifactId>HDFS-001</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.8</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
</dependencies>

</project>

在resources添加一個file:log4j.properties:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

 

API操作

HDFS的命令和linux極其相似,可以類比記憶,在這裡列出一些java api操作:

package com.mcq;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Test;

public class HDFSClient {
	public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		// c.set("fs.defaultFS", "hdfs://hadoop103:9000");
		// FileSystem fs = FileSystem.get(c);
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
		fs.mkdirs(new Path("/ppqq"));
		fs.close();
		System.out.println("over");
	}

	@Test // 文件上傳
	public void testCopyFromLocalFile()
			throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
		fs.copyFromLocalFile(new Path("d:/banzhang.txt"), new Path("/banzhang.txt"));
		fs.close();
		System.out.println("over");
	}

	@Test // 文件下載
	public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
		fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("d:/hadoop test/banhua.txt"), true);
		// 第一個false表示不剪切,最後一個true表示本地,不產生crc文件

		fs.close();
		System.out.println("over");
	}

	@Test // 文件刪除
	public void testDelete() throws IOException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
		fs.delete(new Path("/0811"), true); // 是否遞歸刪除
		fs.close();
		System.out.println("over");
	}

	@Test // 文件更名
	public void testRename() throws IOException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
		fs.rename(new Path("/banzhang.txt"), new Path("/lala.txt"));
		fs.close();
		System.out.println("over");
	}

	@Test
	public void testListFiles() throws IOException, InterruptedException, URISyntaxException {

		// 1獲取文件系統
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");

		// 2 獲取文件詳情
		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

		while (listFiles.hasNext()) {
			LocatedFileStatus status = listFiles.next();

			// 輸出詳情
			// 文件名稱
			System.out.println(status.getPath().getName());
			// 長度
			System.out.println(status.getLen());
			// 許可權
			System.out.println(status.getPermission());
			// 分組
			System.out.println(status.getGroup());

			// 獲取存儲的塊信息
			BlockLocation[] blockLocations = status.getBlockLocations();

			for (BlockLocation blockLocation : blockLocations) {

				// 獲取塊存儲的主機節點
				String[] hosts = blockLocation.getHosts();

				for (String host : hosts) {
					System.out.println(host);
				}
			}

			System.out.println("-----------分割線----------");
		}

		// 3 關閉資源
		fs.close();
	}
	
	@Test
	public void testListStatus() throws IOException, InterruptedException, URISyntaxException{
			
		// 1 獲取文件配置信息
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 判斷是文件還是文件夾
		FileStatus[] listStatus = fs.listStatus(new Path("/"));
			
		for (FileStatus fileStatus : listStatus) {
			
			// 如果是文件
			if (fileStatus.isFile()) {
					System.out.println("f:"+fileStatus.getPath().getName());
				}else {
					System.out.println("d:"+fileStatus.getPath().getName());
				}
			}
			
		// 3 關閉資源
		fs.close();
	}
}

 I/O流操作

上面的API操作 HDFS系統都是框架封裝好的,如果我們想自己實現上述API操作可以採用IO流的方式實現數據的上傳和下載。

 

package com.mcq;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.URL;
import org.junit.Test;

public class HDFSIO {
	//文件上傳
	@Test
	public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {

		// 1 獲取文件系統
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");

		// 2 創建輸入流
		FileInputStream fis = new FileInputStream(new File("d:/banzhang.txt"));

		// 3 獲取輸出流
		FSDataOutputStream fos = fs.create(new Path("/xiaocao.txt"));

		// 4 流對拷
		IOUtils.copyBytes(fis, fos, configuration);

		// 5 關閉資源
		IOUtils.closeStream(fos);
		IOUtils.closeStream(fis);
		fs.close();
	}
	// 文件下載
	@Test
	public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{

		// 1 獲取文件系統
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 獲取輸入流
		FSDataInputStream fis = fs.open(new Path("/banhua.txt"));
			
		// 3 獲取輸出流
		FileOutputStream fos = new FileOutputStream(new File("d:/banhua.txt"));
			
		// 4 流的對拷
		IOUtils.copyBytes(fis, fos, configuration);
			
		// 5 關閉資源
		IOUtils.closeStream(fos);
		IOUtils.closeStream(fis);
		fs.close();
	}
	//定位文件讀取
	//(1)下載第一塊
	@Test
	public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{

		// 1 獲取文件系統
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 獲取輸入流
		FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
			
		// 3 創建輸出流
		FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));
			
		// 4 流的拷貝
		byte[] buf = new byte[1024];
			
		for(int i =0 ; i < 1024 * 128; i++){
			fis.read(buf);
			fos.write(buf);
		}
			
		// 5關閉資源
		IOUtils.closeStream(fis);
		IOUtils.closeStream(fos);
	fs.close();
	}
	//(2)下載第二塊
	@Test
	public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{

		// 1 獲取文件系統
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 打開輸入流
		FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
			
		// 3 定位輸入數據位置
		fis.seek(1024*1024*128);
			
		// 4 創建輸出流
		FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));
			
		// 5 流的對拷
		IOUtils.copyBytes(fis, fos, configuration);
			
		// 6 關閉資源
		IOUtils.closeStream(fis);
		IOUtils.closeStream(fos);
	}
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 今天在公司討論項目重構的問題時,公司的 DBA 針對錶中的欄位大概介紹了一下 float 和 double 的存儲方式。然後,我發現這個問題又回到了浮點數類型在記憶體中的存儲方式,即 IEEE 對浮點數存儲的標準。 我在之前的內容中寫到過,在公司另外一個項目當中,在寫一個 TCP 伺服器時,對端的設備 ...
  • SQL Server SQL Server對大小寫不敏感,每條語句末端使用分號。 1.SQL命令 SELECT 從數據中提取數據 UPDATE 更新數據中的數據 DELETE 從資料庫中刪除數據 INSERT INTO 向資料庫中插入新數據 CREATE DATABASE 創建新資料庫 ALTER ...
  • 概要 回顧以前寫的項目,發現在規範的時候,還是可以做點騷操作的。 假使以後還有新的項目用到了MySQL,那麼肯定是要實踐一番的。 為了準備,創建測試數據表(建表語句中預設使用utf8mb4以及utf8mb4_unicode_ci,感興趣的讀者可以自行搜索這兩個配置): sql CREATE TABL ...
  • 1.Mybatis的分頁plugin實現原理 2.具體步驟 第一步、導入到pom.xml文件中依賴包 第二步、配置插件(必需) 在mybatisConfig.xml文件中配置以下代碼 代碼位置:在enviroment標簽的前一位,切記,位置放錯,代碼也會報錯 第三步、在xxxMapper.java接 ...
  • 今天在做mysql sniff測試的時候,中間重啟MySQL實例的過程中,出現了"The server quit without updating PID file"這個經典的錯誤。因為把mysql sniff的日誌文件放在了mysql實例的目錄中,因此刪除mysql sniff日誌的時候無意中刪除 ...
  • [TOC] pymysql操作mysql 安裝 連接 增 刪 改 查 索引 為什麼使用索引以及索引的作用 使用索引就是為了提高查詢效率的 類比 字典中的目錄 索引的本質 一個特殊的文件 索引的底層原理 B+樹 索引的種類(重點) 主鍵索引 加速查找 + 不能重覆 + 不能為空 唯一索引 加速查找 + ...
  • 在redhat6.5上安裝Oracle時,最後使用oracle用戶執行runInstaller 報錯如下,無法連接到安裝有xmanager的windows伺服器,也就無法圖形化安裝oracle 經過百度找到一個靠譜的解決方法,實驗成功。如下首先檢查伺服器是否安裝了xdpyinfo 切換到root用戶 ...
  • 範式是具有最小冗餘的表結構。 三範式具體如下: 1.第一範式(1NF):欄位都是不可再分的; 第一範式的目標是確保每列的原子性:如果每列都是不可再分的最小數據單元,則滿足第一範式(1NF); 2.第二範式(2NF):每個表只描述一件事情; 首先滿足第一範式,並且表中非主鍵屬性必須完全要依賴於主鍵屬性 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...