某公司實習生面試題,Java 語料處理題目,LZ 使用 Hadoop 一步一步實現...... ...
原創播客,如需轉載請註明出處。原文地址:http://www.cnblogs.com/crawl/p/7751741.html
----------------------------------------------------------------------------------------------------------------------------------------------------------
筆記中提供了大量的代碼示例,需要說明的是,大部分代碼示例都是本人所敲代碼併進行測試,不足之處,請大家指正~
本博客中所有言論僅代表博主本人觀點,若有疑惑或者需要本系列分享中的資料工具,敬請聯繫 [email protected]
-----------------------------------------------------------------------------------------------------------------------------------------------------------
前言:近日有高中同學求助,一看題目,正好與 LZ 所學技術相關,便答應了下來,因為正是興趣所在,也沒有管能不能實現。
話不多說,先上題:
同學告訴我 ,這是浪潮實習生的面試題。
LZ先對題目作簡單的說明,給定了一個 corpus.txt 文件作為語料處理的源文件,文件大小 30.3M,內容即題目要求中的圖片所示,要求對語料文件中出現的詞進行詞頻統計,並把詞頻相同的詞語用 ## 相連(如 研究##落實 1008 ),並按詞頻從大到小排序。題目要求的是根據所學的 Java I/O 處理、集合框架、字元集與國際化、異常處理等基礎知識完成此題,但同學表明可以使用大數據的相關知識,讓 LZ 感到興趣的是,最近 LZ 一直在研究 Hadoop,詞頻統計的題目做了不少,便欣然接受同學的求助。自己挖的坑總要填的,若是進行簡單的詞頻統計,很簡單,涉及到將相同詞頻的詞語排在一行並用 ## 連接,因為 LZ 水平有限,在實現的過程中還是遇到了不少的困難 。
一、簡單實現詞頻統計
剛入手此題目,LZ 的思路就是先實現一個簡單的詞頻統計,然後在實現簡單詞頻統計的基礎上,對代碼進行修改,實現相同詞頻的詞語放在 一行使用 ## 連接。思路很簡單,簡單的詞頻統計實現的也非常順利,但接下的思路實現起來便沒有那麼容易了。先來看一看簡單的詞頻統計這個功能吧。
1. 首先先來寫 Mapper 的功能,上代碼:
1 public class WordHandlerMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 2 3 @Override 4 protected void map(LongWritable key, Text value, Context context) 5 throws IOException, InterruptedException { 6 7 String[] strs = StringUtils.split(value.toString(), "\t"); 8 9 for(String str : strs) { 10 int index = str.indexOf("/"); 11 12 if(index < 0) { 13 index = 0; 14 } 15 16 String word = str.substring(0, index); 17 context.write(new Text(word), new LongWritable(1)); 18 19 } 20 21 } 22 23 }
WordHandlerMapper 類的功能,首先此類實現了 Mapper 類,重寫了 mapper 方法,使用預設的 TextOutputFormat 類,將讀取到的一行數據以形參 value 的形式傳入 mapper 方法,第 7 行對這行數據也就是 value 進行處理,以 \t 進行分割,得到了一個 String 數組, 數組的形式為:["足協/j", "杯賽/n", "常/d"] 形式,然後 10 行對數組進行遍歷,然後獲取到 /之前的內容,也就是我們需要統計詞頻的詞語,如 10 - 16 行所示,然後將得到的詞語傳入 context 的 write 方法,map 程式進行緩存和排序後,再傳給 reduce 程式。
2. 然後再來看 Reduce 程式:
1 public class WordHandlerReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 2 3 4 @Override 5 protected void reduce(Text key, Iterable<LongWritable> values, Context context) 6 throws IOException, InterruptedException { 7 8 long count = 0; 9 10 for(LongWritable value : values) { 11 count += value.get(); 12 } 13 14 context.write(key, new LongWritable(count)); 15 16 } 17 18 }
WordHandlerReducer 的邏輯很簡單,此類繼承了 Reduce,重寫了 reduce 方法,傳入的 key 即需要統計詞頻的詞語,vaues 為 {1,1,1} 形式,8 行定義了一個計數器,然後對 values 進行增強 for 迴圈遍歷,使計數器加 1,然後將詞語和詞頻輸出即可。
3. 然後我們再定義個類來描述這個特定的作業:
1 public class WordHandlerRunner { 2 3 public static void main(String[] args) throws Exception { 4 5 Configuration conf = new Configuration(); 6 Job wcJob = Job.getInstance(conf); 7 8 wcJob.setJarByClass(WordHandlerRunner.class); 9 10 wcJob.setMapperClass(WordHandlerMapper.class); 11 wcJob.setReducerClass(WordHandlerReducer.class); 12 13 wcJob.setOutputKeyClass(Text.class); 14 wcJob.setOutputValueClass(LongWritable.class); 15 16 wcJob.setMapOutputKeyClass(Text.class); 17 wcJob.setMapOutputValueClass(LongWritable.class); 18 19 FileInputFormat.setInputPaths(wcJob, new Path("/wc/srcdata2/")); 20 21 FileOutputFormat.setOutputPath(wcJob, new Path("/wc/wordhandler/")); 22 23 wcJob.waitForCompletion(true); 24 25 } 26 27 }
第 6 行得到 Job 對象,之後便是對一些基本參數的設置,基本上就是見方法名而知其意了 。 19 和 21 行定義存放元數據的路徑和輸入結果的路徑,23行提交作業。
在 Eclipse 中將程式打成 jar 包,名為 wordhandler.jar 導出,上傳到 Linux 伺服器 ,將原始的語料處理文件上傳到程式中指定了路徑下,通過
hadoop jar wordhandler.jar com.software.hadoop.mr.wordhandler.WordHandlerRunner 命令執行,很快就會執行完畢,然後到輸出路徑中查看輸出結果(圖片展示部分結果):
到此簡單的詞頻統計功能就到此結束了,觀察可知,MapReduce 預設是按輸出的 key 進行排序的。得到的數據距離題目要求的結果還有很大的懸殊,那麼剩下需要進一步實現的還有兩處,一處是將詞頻相同的詞語放到一行並用 ## 連接,第二處就是對詞頻進行排序(按從大到小) 。
LZ 的思路是,排序肯定是要放到最後一步實現,若先進行排序,在對相同詞頻的詞語處理的話,很有可能會打亂之前的排序。那麼,現在就是對詞頻相同的詞語進行處理了,使它們顯示在一行並用 ## 連接。在實現這個功能時,LZ 遇到了困難,主要是對 TextOutputFarmat 預設只讀取一行數據意識不夠深入,走了許多的彎路,比如 LZ 相到修改 Hadoop 的源碼,讀取多行數據等,但由於 LZ 水平有限,結果以失敗告終。到此時已經是夜裡將近十二點了,因為到第二天還要早起,所以就沒再熬夜,暫時放下了。
第二天中午,LZ 想到了倒排索引,使用倒排索引實現的思路便一點一點形成。把我們之前得到的數據讀入,然後將詞頻當做 key,這樣 Mapper 程式便會在 Reduce 執行之前進行緩存和分類,思路來了,便馬上動手實現。
二、使用倒排索引初步實現相同詞頻寫入一行並用 ## 連接
1. 還是先來 Mapper 的功能:註意,這次讀入的數據是我們之前得到的按預設的形式排好序,並統計出詞頻的數據。
1 public class WordHandlerMapper2 extends Mapper<LongWritable, Text, LongWritable, Text> { 2 3 @Override 4 protected void map(LongWritable key, Text value, Context context) 5 throws IOException, InterruptedException { 6 7 String[] strs = StringUtils.split(value.toString(), "\t"); 8 9 String text = strs[0]; 10 11 long count = Long.parseLong(strs[1]); 12 13 context.write(new LongWritable(count), new Text(text)); 14 15 } 16 17 }
Map 的功能很簡單,我們需要輸出的 key 是 LongWritable 類型,value 是 Text 類型,即 [205, {"檢驗", "加入", "生存"}] 這種類型。第 7 行同樣是對一行的數據進行拆分,然後得到 詞語(text) 和 詞頻(count),然後 第 13 行進行輸出即可,很簡單。
2. Reduce 程式的功能:
1 public class WordHandlerReducer2 extends Reducer<LongWritable, Text, Text, LongWritable> { 2 3 //key: 3 values: {"研究","落實"} 4 @Override 5 protected void reduce(LongWritable key, Iterable<Text> values, Context context) 6 throws IOException, InterruptedException { 7 8 String result = ""; 9 10 for(Text value : values) { 11 result += value.toString() + "##"; 12 13 } 14 15 context.write(new Text(result), key); 16 17 } 18 19 }
Reduce 的邏輯比之前的稍微複雜一點,從 Mapper 中輸入的數據格式為 [205, {"檢驗", "加入", "生存"}] 類型,我們希望輸出的格式為:[檢驗##加入##生存 205], 重寫的 reduce 方法傳入的 values 即 {"檢驗", "加入", "生存"} 類型,第十行對 values 進行遍歷,11 行向 result 中追加,即可得到我們需要的結果,然後 15 行進行輸出。
3. 然後再來定義一個類來描述此作業,
1 public class WordHandlerRunner2 { 2 3 public static void main(String[] args) throws Exception { 4 5 Configuration conf = new Configuration(); 6 Job wcJob = Job.getInstance(conf); 7 8 wcJob.setJarByClass(WordHandlerRunner2.class); 9 10 wcJob.setMapperClass(WordHandlerMapper2.class); 11 wcJob.setReducerClass(WordHandlerReducer2.class); 12 13 wcJob.setOutputKeyClass(Text.class); 14 wcJob.setOutputValueClass(LongWritable.class); 15 16 wcJob.setMapOutputKeyClass(LongWritable.class); 17 wcJob.setMapOutputValueClass(Text.class); 18 19 FileInputFormat.setInputPaths(wcJob, new Path("/wc/wordhandler/")); 20 21 FileOutputFormat.setOutputPath(wcJob, new Path("/wc/wordhandleroutput/")); 22 23 wcJob.waitForCompletion(true); 24 25 } 26 27 }
這個類與之前的那個 Job 描述類很類似,使用的 Job 的方法沒有變化,方法的參數只做稍微修改即可,LZ 標紅的行即需要進行修改的行。
然後是同樣的步驟,在 Eclipse 中將程式打成 jar 包導出,也叫 wordhandler.jar,然後上傳到 Linux 伺服器中,使用
hadoop jar wordhandler.jar com.software.hadoop.mr.wordhandler2.WordHandlerRunner2 進行運行,同過 Map 和 Reduce 的處理後,進入相應的目錄下,查看結果(圖片展示部分結果):
我們分析一下得到的結果,是不是距離題目要求的輸出結果更接近了一步,但是還差點事,一個是每一行的最後多了一個 ##,這個好解決,在生成字元串的時候判斷該詞語是否為最後一個即可,另一個就是題目要求詞頻按從大到小的順序輸出,而我們的輸出順序是從小到大。明確了問題之後,繼續開動吧。
三、實現按詞頻從大到小進行排序
排序問題是使用 Hadoop 進行詞頻處理的常見問題了,實現起來並不困難。說一說思路,因為我們這裡是預設讀取一行,那麼我們構造一個 Word 類,此類有屬性 text (內容),和(count)詞頻,此類需要實現 WritableComparable 介面,重寫其中的方法,使用我們自定義的排序方式即可。既然思路明確了,那我們一步一步的實現。
1. 先定義一個 word 類:
1 public class Word implements WritableComparable<Word> { 2 3 private String text; 4 5 private long count; 6 7 public Word() {} 8 9 public Word(String text, long count) { 10 super(); 11 this.text = text; 12 this.count = count; 13 } 14 15 public String getText() { 16 return text; 17 } 18 19 public void setText(String text) { 20 this.text = text; 21 } 22 23 public long getCount() { 24 return count; 25 } 26 27 public void setCount(long count) { 28 this.count = count; 29 } 30 31 @Override 32 public void write(DataOutput out) throws IOException { 33 out.writeUTF(text); 34 out.writeLong(count); 35 } 36 37 @Override 38 public void readFields(DataInput in) throws IOException { 39 text = in.readUTF(); 40 count = in.readLong(); 41 } 42 43 @Override 44 public int compareTo(Word o) { 45 return count > o.getCount() ? -1 : 1; 46 } 47 48 }
此類需要實現 WritableComparable 介面,重寫第 32 行的 write 方法,第 38 行的 readFields 方法,第 44 行 compareTo 方法,32 行和 38 行的方法是 Hadoop 中序列化相關的方法,44 行 compareTo 方法才是我們自定義排序方式的方法。值的註意的是,write 方法中和 readFields 方法中屬性的序列化和反序列化的順序必須一致,即 33、34 和 39、40 行的屬性需要對應。然後 compareTo 中 第 45 行實現自定義的從大到小的排序即可。
2. Mapper 類的功能:
1 public class WordHandlerMapper3 extends Mapper<LongWritable, Text, Word, NullWritable> { 2 3 @Override 4 protected void map(LongWritable key, Text value, Context context) 5 throws IOException, InterruptedException { 6 7 String[] strs = StringUtils.split(value.toString(), "\t"); 8 9 String text = strs[0]; 10 11 long count = Long.parseLong(strs[1]); 12 13 Word word = new Word(text, count); 14 15 context.write(word, NullWritable.get()); 16 17 } 18 19 }
我們定義 Mapper 的輸出的 key 為 Word 類型是排序成功的關鍵,在 mapper 方法中常規的拆分一行數據,獲得到相應的欄位,然後第 13 行封裝為一個 Word 對象,第 15 行輸出即可,LZ 定義 Mapper 的輸出的 vlaue 為 NullWritable 類型,思路為只要輸出的 key 為 Word 型,那麼我們就可以獲取到需要的信息了。
2. 再來看 Reduce 的功能:
1 public class WordHandlerReducer3 extends Reducer<Word, NullWritable, Text, LongWritable> { 2 3 @Override 4 protected void reduce(Word key, Iterable<NullWritable> values, Context context) 5 throws IOException, InterruptedException { 6 context.write(new Text(key.getText()), new LongWritable(key.getCount())); 7 } 8 9 }
Reduce 的功能再簡單不過了,得到的 key 是一個一個的 Word ,6 行獲取相應的欄位輸出即可。
3. 再來描述排序這個特定的作業,代碼與之前的類似,只做稍微修改即可,代碼 LZ 就不貼出來了。這樣我們排序的功能就實現了,然後在 Linux 中通過命令運行,將得到的結果導出到 Windows 中,重命名為 postagmodel.txt 即可,此文件共 1163 行,現在貼一下部分結果的圖片:
結果出來了,基本與題目要求吻合,LZ 鬆了一口氣。
在實現功能之後 LZ 稍微總結了一下:
可能由於經驗不足,遇到問題不知如何解決,積累經驗尤為重要,畢竟經驗這個問題不是短時間內形成的,多學多敲多練是根本;
然後,一個功能或者一個需求的實現,何為簡單,何為困難,LZ 認為最終如果我們實現了這個功能或需求,回過頭來看,它就是簡單的,此時也有可能是熟練度的問題,使它蒙上了那層困難的面紗,遇到困難,別放棄,學會短暫性捨棄,過段時間再撿起來,可能靈感就來了。
還需要說一點,此功能的實現 LZ 的思路和方法可能不是最好的,也有可能會有不妥的地方存在,歡迎大家一塊交流學習,若有不足之處,還請指出,留言、評論、郵箱 LZ 均可。