前置操作 創建maven工程,修改pom.xml文件: 在resources添加一個file:log4j.properties: API操作 HDFS的命令和linux極其相似,可以類比記憶,在這裡列出一些java api操作: I/O流操作 上面的API操作 HDFS系統都是框架封裝好的,如果我們 ...
前置操作
創建maven工程,修改pom.xml文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mcq</groupId> <artifactId>HDFS-001</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> </dependencies> </project>
在resources添加一個file:log4j.properties:
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
API操作
HDFS的命令和linux極其相似,可以類比記憶,在這裡列出一些java api操作:
package com.mcq; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.junit.Test; public class HDFSClient { public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); // c.set("fs.defaultFS", "hdfs://hadoop103:9000"); // FileSystem fs = FileSystem.get(c); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq"); fs.mkdirs(new Path("/ppqq")); fs.close(); System.out.println("over"); } @Test // 文件上傳 public void testCopyFromLocalFile() throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq"); fs.copyFromLocalFile(new Path("d:/banzhang.txt"), new Path("/banzhang.txt")); fs.close(); System.out.println("over"); } @Test // 文件下載 public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq"); fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("d:/hadoop test/banhua.txt"), true); // 第一個false表示不剪切,最後一個true表示本地,不產生crc文件 fs.close(); System.out.println("over"); } @Test // 文件刪除 public void testDelete() throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq"); fs.delete(new Path("/0811"), true); // 是否遞歸刪除 fs.close(); System.out.println("over"); } @Test // 文件更名 public void testRename() throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq"); fs.rename(new Path("/banzhang.txt"), new Path("/lala.txt")); fs.close(); System.out.println("over"); } @Test public void testListFiles() throws IOException, InterruptedException, URISyntaxException { // 1獲取文件系統 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq"); // 2 獲取文件詳情 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus status = listFiles.next(); // 輸出詳情 // 文件名稱 System.out.println(status.getPath().getName()); // 長度 System.out.println(status.getLen()); // 許可權 System.out.println(status.getPermission()); // 分組 System.out.println(status.getGroup()); // 獲取存儲的塊信息 BlockLocation[] blockLocations = status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { // 獲取塊存儲的主機節點 String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("-----------分割線----------"); } // 3 關閉資源 fs.close(); } @Test public void testListStatus() throws IOException, InterruptedException, URISyntaxException{ // 1 獲取文件配置信息 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq"); // 2 判斷是文件還是文件夾 FileStatus[] listStatus = fs.listStatus(new Path("/")); for (FileStatus fileStatus : listStatus) { // 如果是文件 if (fileStatus.isFile()) { System.out.println("f:"+fileStatus.getPath().getName()); }else { System.out.println("d:"+fileStatus.getPath().getName()); } } // 3 關閉資源 fs.close(); } }
I/O流操作
上面的API操作 HDFS系統都是框架封裝好的,如果我們想自己實現上述API操作可以採用IO流的方式實現數據的上傳和下載。
package com.mcq; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.yarn.api.records.URL; import org.junit.Test; public class HDFSIO { //文件上傳 @Test public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException { // 1 獲取文件系統 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq"); // 2 創建輸入流 FileInputStream fis = new FileInputStream(new File("d:/banzhang.txt")); // 3 獲取輸出流 FSDataOutputStream fos = fs.create(new Path("/xiaocao.txt")); // 4 流對拷 IOUtils.copyBytes(fis, fos, configuration); // 5 關閉資源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); } // 文件下載 @Test public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{ // 1 獲取文件系統 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq"); // 2 獲取輸入流 FSDataInputStream fis = fs.open(new Path("/banhua.txt")); // 3 獲取輸出流 FileOutputStream fos = new FileOutputStream(new File("d:/banhua.txt")); // 4 流的對拷 IOUtils.copyBytes(fis, fos, configuration); // 5 關閉資源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); } //定位文件讀取 //(1)下載第一塊 @Test public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{ // 1 獲取文件系統 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq"); // 2 獲取輸入流 FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz")); // 3 創建輸出流 FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1")); // 4 流的拷貝 byte[] buf = new byte[1024]; for(int i =0 ; i < 1024 * 128; i++){ fis.read(buf); fos.write(buf); } // 5關閉資源 IOUtils.closeStream(fis); IOUtils.closeStream(fos); fs.close(); } //(2)下載第二塊 @Test public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{ // 1 獲取文件系統 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq"); // 2 打開輸入流 FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz")); // 3 定位輸入數據位置 fis.seek(1024*1024*128); // 4 創建輸出流 FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2")); // 5 流的對拷 IOUtils.copyBytes(fis, fos, configuration); // 6 關閉資源 IOUtils.closeStream(fis); IOUtils.closeStream(fos); } }