Hadoop學習(4)-mapreduce的一些註意事項

来源:https://www.cnblogs.com/wpbing/archive/2019/07/25/11242866.html
-Advertisement-
Play Games

關於mapreduce的一些註意細節 如果把mapreduce程式打包放到了liux下去運行, 命令java –cp xxx.jar 主類名 如果報錯了,說明是缺少相關的依賴jar包 用命令hadoop jar xxx.jar 類名因為在集群機器上用 hadoop jar xx.jar mr.wc. ...


關於mapreduce的一些註意細節

如果把mapreduce程式打包放到了liux下去運行,

命令java  –cp  xxx.jar 主類名

如果報錯了,說明是缺少相關的依賴jar包

用命令hadoop jar xxx.jar 類名因為在集群機器上用 hadoop jar xx.jar mr.wc.JobSubmitter 命令來啟動客戶端main方法時,hadoop jar這個命令會將所在機器上的hadoop安裝目錄中的jar包和配置文件加入到運行時的classpath中

那麼,我們的客戶端main方法中的new Configuration()語句就會載入classpath中的配置文件,自然就有了

fs.defaultFS 和 mapreduce.framework.name 和 yarn.resourcemanager.hostname 這些參數配置

會把本地hadoop的相關的所有jar包都會引用

Mapreduce也有本地的job運行,就是可以不用提交到yarn上,可以以單機的模式跑一邊以多個線程模擬也可以。

就是如果不管在Linux下還是windows下,提交job都會預設的提交到本地去運行,

如果在linux預設提交到yarn上運行,需要寫配置文件hadoop/etc/mapred-site.xml文件

<configuration>

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>

</property>

</configuration>

 

Key,value對,如果是自己的類的話,那麼這個類要實現Writable,同時要把你想序列化的數據轉化成二進位,然後放到重寫方法wirte參數的DataOutput裡面,另一個readFields重寫方法是用來反序列化用的,

註意反序列化的時候,會先拿這個類的無參構造方法構造出一個對象出來,然後再通過readFields方法來複原這個對象。

 

DataOutput也是一種流,只不過是hadoop的在封裝,自己用的時候,裡面需要加個FileOutputStream對象

DataOutput寫字元串的時候要用writeUTF(“字元串”),他這樣編碼的時候,會在字元串的前面先加上字元串的長度,這是考慮到字元編碼對其的問題,hadoop解析的時候就會先讀前面兩個位元組,看一看這個字元串有多長,不然如果用write(字元串.getBytes())這樣他不知道這個字元串到底有多少個位元組。

 

在reduce階段,如果把一個對象寫到hdfs裡面,那麼會調用字元串的toString方法,你可以重寫這個類的toString方法 

舉例,下麵這個類就可以在hadoop里序列化

package mapreduce2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Waitable;

public class FlowBean implements Writable {

    private int up;//上行流量
    private int down;//下行流量
    private int sum;//總流量
    private String phone;//電話號
    
    public FlowBean(int up, int down, String phone) {
        this.up = up;
        this.down = down;
        this.sum = up + down;
        this.phone = phone;
    }
    public int getUp() {
        return up;
    }
    public void setUp(int up) {
        this.up = up;
    }
    public int getDown() {
        return down;
    }
    public void setDown(int down) {
        this.down = down;
    }
    public int getSum() {
        return sum;
    }
    public void setSum(int sum) {
        this.sum = sum;
    }
    public String getPhone() {
        return phone;
    }
    public void setPhone(String phone) {
        this.phone = phone;
    }
    @Override
    public void readFields(DataInput di) throws IOException {
        //註意這裡讀的順序要和寫的順序是一樣的
        this.up = di.readInt();
        this.down = di.readInt();
        this.sum = this.up + this.down;
        this.phone = di.readUTF();
    }
    @Override
    public void write(DataOutput Do) throws IOException {
        Do.writeInt(this.up);
        Do.writeInt(this.down);
        Do.writeInt(this.sum);
        Do.writeUTF(this.phone);
    }
    @Override
    public String toString() {
        return "電話號"+this.phone+" 總流量"+this.sum;
    }
}

 

 

 當所有的reduceTask都運行完之後,還會調用一個cleanup方法

應用練習:統計一個頁面訪問總量為n條的數據

方案一:只用一個reducetask,利用cleanup方法,在reducetask階段,先不直接放到hdfs裡面,而是存到一個Treemap裡面

再在reducetask結束後,在cleanup裡面通過把Treemap裡面前五輸出到HDFS裡面;

package cn.edu360.mr.page.topn;

public class PageCount implements Comparable<PageCount>{
    
    private String page;
    private int count;
    
    public void set(String page, int count) {
        this.page = page;
        this.count = count;
    }
    
    public String getPage() {
        return page;
    }
    public void setPage(String page) {
        this.page = page;
    }
    public int getCount() {
        return count;
    }
    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public int compareTo(PageCount o) {
        return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;
    }
    
    

}

 

map類

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class PageTopnMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] split = line.split(" ");
        context.write(new Text(split[1]), new IntWritable(1));
    }

}

reduce類

package cn.edu360.mr.page.topn;

import java.io.IOException;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class PageTopnReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
    TreeMap<PageCount, Object> treeMap = new TreeMap<>();
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        PageCount pageCount = new PageCount();
        pageCount.set(key.toString(), count);
        
        treeMap.put(pageCount,null);
        
    }
    @Override
    protected void cleanup(Context context)
            throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
    //可以在cleanup裡面拿到configuration,從裡面讀取要拿前幾條數據
int topn = conf.getInt("top.n", 5); Set<Entry<PageCount, Object>> entrySet = treeMap.entrySet(); int i= 0; for (Entry<PageCount, Object> entry : entrySet) { context.write(new Text(entry.getKey().getPage()), new IntWritable(entry.getKey().getCount())); i++; if(i==topn) return; } } }

然後jobSubmit類,註意這個要設定Configuration,這裡面有幾種方法

第一種是載入配置文件

        Configuration conf = new Configuration();
        conf.addResource("xx-oo.xml");

然後再在xx-oo.xml文件裡面寫

<configuration>
    <property>
        <name>top.n</name>
        <value>6</value>
    </property>
</configuration>

第二種方式

    //通過直接設定
        conf.setInt("top.n", 3);
        //通過對java主程式 直接傳進來的參數
        conf.setInt("top.n", Integer.parseInt(args[0]));

第三種方式通過獲取配置文件參數

     Properties props = new Properties();
        props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties"));
        conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));

然後再在topn.properties裡面配置參數

top.n=5

subsubmit類,預設在本機模擬運行

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JobSubmitter {

    public static void main(String[] args) throws Exception {

        /**
         * 通過載入classpath下的*-site.xml文件解析參數
         */
        Configuration conf = new Configuration();
        conf.addResource("xx-oo.xml");
        
        /**
         * 通過代碼設置參數
         */
        //conf.setInt("top.n", 3);
        //conf.setInt("top.n", Integer.parseInt(args[0]));
        
        /**
         * 通過屬性配置文件獲取參數
         */
        /*Properties props = new Properties();
        props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties"));
        conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));*/
        
        Job job = Job.getInstance(conf);

        job.setJarByClass(JobSubmitter.class);

        job.setMapperClass(PageTopnMapper.class);
        job.setReducerClass(PageTopnReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\url\\input"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\url\\output"));

        job.waitForCompletion(true);

    }
}

 有時一個任務一個mapreduce是完成不了的,有可能會拆分成兩個或多個mapreduce

map階段會有自己的排序機制,比如一組數據(a,1),(b,1),(a,1),(c,1),他會先處理key為1的一組數據,

這個排序機制我們也可以自己去實現,要對這個類實現Comparable介面,然後重寫compareTo方法。

但要註意這個排序機制只是對於一個reducetask來說的,如果有多個的話,只會得到局部排序。

如果要多個reducetask的話,我們就需要控制數據的分發規則,這樣雖然是會生成多個排序後的文件,但這些文件整體上依然是有序的。因為我們控制了每一個reducetask處理數據的範圍。

 

 

 

額外java知識點補充

Treemap,放進去的東西會自動排序

兩種Treemap的自定義方法,第一種是傳入一個Comparator

public class TreeMapTest {
    
    public static void main(String[] args) {
        
        TreeMap<FlowBean, String> tm1 = new TreeMap<>(new Comparator<FlowBean>() {
            @Override
            public int compare(FlowBean o1, FlowBean o2) {
                //如果兩個類總流量相同的會比較電話號
                if( o2.getAmountFlow()-o1.getAmountFlow()==0){
                    return o1.getPhone().compareTo(o2.getPhone());
                }
                //如果流量不同,就按從小到大的順序排序
                return o2.getAmountFlow()-o1.getAmountFlow();
            }
        });
        FlowBean b1 = new FlowBean("1367788", 500, 300);
        FlowBean b2 = new FlowBean("1367766", 400, 200);
        FlowBean b3 = new FlowBean("1367755", 600, 400);
        FlowBean b4 = new FlowBean("1367744", 300, 500);
        
        tm1.put(b1, null);
        tm1.put(b2, null);
        tm1.put(b3, null);
        tm1.put(b4, null);
        //treeset的遍歷
        Set<Entry<FlowBean,String>> entrySet = tm1.entrySet();
        for (Entry<FlowBean,String> entry : entrySet) {
            System.out.println(entry.getKey() +"\t"+ entry.getValue());
        }
    }

}

第二種是在這個類中,實現一個Comparable介面

package cn.edu360.mr.page.topn;

public class PageCount implements Comparable<PageCount>{
    
    private String page;
    private int count;
    
    public void set(String page, int count) {
        this.page = page;
        this.count = count;
    }
    
    public String getPage() {
        return page;
    }
    public void setPage(String page) {
        this.page = page;
    }
    public int getCount() {
        return count;
    }
    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public int compareTo(PageCount o) {
        return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;
    }
    
    

}

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.背景 sysbench是一款壓力測試工具,可以測試系統的硬體性能,也可以用來對資料庫進行基準測試。sysbench 支持的測試有CPU運算性能測試、記憶體分配及傳輸速度測試、磁碟IO性能測試、POSIX線程性能測試、互斥性測試測試、資料庫性能測試(OLTP基準測試)。目前支持的資料庫主要是MySQ ...
  • 1.下載mysql5.7的rpm安裝包 rpm的mysql包,安裝起來簡單,解壓版的mysql還需要做許多配置,稍有不慎就會出錯!!! 下載地址:https://dev.mysql.com/downloads/mysql/5.7.html#downloads 下載後的安裝包是這個樣子的 2.上傳my ...
  • 用於Keys命令或match命令得到匹配的key時使用,註意不要與正則表達式混淆 語法:KEYS pattern / scan 0 match pattern count 10 說明:返回與指定模式相匹配的所用的keys。 該命令所支持的匹配模式如下: (1)?:用於匹配單個字元。例如,h?llo可 ...
  • 以sql server為例: 1、表值函數 用戶定義表值函數返回 table 數據類型,表是單個 SELECT 語句的結果集。 示例代碼CREATE FUNCTION Test_GetEmployeeSalary ( @EmployeeID VARCHAR(20) --參數)RETURNS TABL ...
  • 可以將以下代碼保存為backup.bat,添加計劃任務即可。 也可直接在cmd命令中複製單條語句執行,註意修改為自己的電腦路徑。 說明:--skip-lock-tables 如出現Can’t open file when using LOCK TABLES錯誤提示,可能是許可權不足導致,這裡我們在上述 ...
  • 1、創建臨時表的方法 方法一、select * into #臨時表名 from 你的表; 方法二、 create table #臨時表名(欄位1 約束條件,欄位2 約束條件,.....)create table ##臨時表名(欄位1 約束條件,欄位2 約束條件,.....) 註:以上的#代表局部臨時 ...
  • 瞭解了什麼是kafka( https://www.cnblogs.com/tree1123/p/11226880.html)以後 學習Kafka核心之消費者,kafka的消費者經過幾次版本變化,特別容易混亂,所以一定要搞清楚是哪個版本再研究。 一、舊版本consumer 只有舊版本(0.9以前)才有 ...
  • 總耗時: [SQL] CALL insert_batch();受影響的行: 1時間: 873.795s ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...