Hive+Sqoop+Mysql整合 在本文中,LZ隨意想到了一個場景: 車,道路,監控,攝像頭 ...
Hive+Sqoop+Mysql整合
在本文中,LZ隨意想到了一個場景:
車,道路,監控,攝像頭
即當一輛車在道路上面行駛的時候,道路上面的監控點裡面的攝像頭就會對車進行數據採集。
我們對採集的數據進行分析,處理,最後把結果保存到mysql資料庫中供Web UI顯示監控點/攝像頭狀態。
A:監控點/攝像頭狀態
工作流程如下:
1.數據格式
/** * 產生測試數據: * 數據format: * 記錄時間 車牌號碼 車速 道路編號 監控地點 攝像頭編號 * date_time vehicle_plate vehicle_speed road_id monitor_id camera_id * * 中間使用'\t'隔開 * 16/01/2019 10:20:30 SCN89000J 124 10002 20004 40007 * * 具體說明: * 道路編號 * 10001 - 10100 * * 監控地點 - 在一條道路上面有2個監控點 * 20001 - 20200 * * 攝像頭編號 - 在一個監控點上面2個攝像頭 * 40001 - 40400 * * 道路: 10001 10002 * 監控: 20001-20002 20003-20004 * 攝像頭: 40001-40002-40003-40004 40005-40006-40007-40008 * * 車速: 1-300。 如果大於260,則為超速行駛 * * 車牌: SCN89000J * * 記錄時間: 16/01/2019 10:20:30 * */
2.生成測試數據
--編譯運行java code cd /root/vehicle_dir/ vi DataGenerate.java
import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * 產生測試數據: * 數據format: * 記錄時間 車牌號碼 車速 道路編號 監控地點 攝像頭編號 * date_time vehicle_plate vehicle_speed road_id monitor_id camera_id * * 中間使用'\t'隔開 * 16/01/2019 10:20:30 SCN89000J 124 10002 20004 40007 * * 具體說明: * 道路編號 * 10001 - 10100 * * 監控地點 - 在一條道路上面有2個監控點 * 20001 - 20200 * * 攝像頭編號 - 在一個監控點上面2個攝像頭 * 40001 - 40400 * * 道路: 10001 10002 * 監控: 20001-20002 20003-20004 * 攝像頭: 40001-40002-40003-40004 40005-40006-40007-40008 * * 車速: 1-300。 如果大於260,則為超速行駛 * * 車牌: SCN89000J * * 記錄時間: 16/01/2019 10:20:30 * * @author Hongten */ public class DataGenerate { Integer[] roadIdArray = new Integer[Common.ROAD_NUM]; public static void main(String[] args) { long begin = System.currentTimeMillis(); DataGenerate dataGenerate = new DataGenerate(); dataGenerate.init(); dataGenerate.generateData(); long end = System.currentTimeMillis(); System.out.println("Total: " + (end - begin) + " ms"); } public void init() { // create files FileUtils.createFile(Common.VEHICLE_LOG); FileUtils.createFile(Common.ROAD_MONITOR_CAMERA_RELATIONSHIP); generateRoadIds(); } /** * 道路: 10001 10002 監控: 20001-20002 20003-20004 攝像頭: 40001-40002-40003-40004 * 40005-40006-40007-40008 */ public void generateRoadIds() { StringBuilder readMonitorCameraRelationship = new StringBuilder(); for (int i = 0; i < Common.ROAD_NUM; i++) { int roadId = 10000 + (i + 1);// 10001 roadIdArray[i] = roadId; int monitorB = roadId * 2;// 20002 int monitorA = monitorB - 1;// 20001 int cameraAB = monitorA * 2;// 40002 int cameraAA = cameraAB - 1;// 40001 int cameraBB = monitorB * 2;// 40004 int cameraBA = cameraBB - 1;// 40003 // monitorA // 10001 20001 40001 // 10001 20001 40002 readMonitorCameraRelationship.append(roadId).append(Common.SEPARATOR).append(monitorA).append(Common.SEPARATOR).append(cameraAA).append(Common.LINE_BREAK); readMonitorCameraRelationship.append(roadId).append(Common.SEPARATOR).append(monitorA).append(Common.SEPARATOR).append(cameraAB).append(Common.LINE_BREAK); // monitorB // 10001 20002 40003 // 10001 20002 40004 readMonitorCameraRelationship.append(roadId).append(Common.SEPARATOR).append(monitorB).append(Common.SEPARATOR).append(cameraBA).append(Common.LINE_BREAK); readMonitorCameraRelationship.append(roadId).append(Common.SEPARATOR).append(monitorB).append(Common.SEPARATOR).append(cameraBB).append(Common.LINE_BREAK); } saveData(Common.ROAD_MONITOR_CAMERA_RELATIONSHIP, readMonitorCameraRelationship.toString()); } public void saveData(String pathFileName, String newContent) { //remove the last '\n' newContent = newContent.substring(0, newContent.length() - 1); FileUtils.saveFile(pathFileName, newContent); } public void generateData() { //StringBuffer可以保證線程安全 StringBuffer contentSb = new StringBuffer(); SimpleDateFormat simpleDateFormat_ddMMyyyy = new SimpleDateFormat(Common.DATE_FORMAT_YYYYMMDD); Date today = new Date(); String date = simpleDateFormat_ddMMyyyy.format(today); Random random = new Random(); //異常道路 List<Integer> errorRoadIdList = new ArrayList<Integer>(); generateErrorRoadIdList(random, errorRoadIdList); long begin = System.currentTimeMillis(); //使用多線程 ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < Common.VEHICLE_NUMBER; i++) { String vehiclePlate = VehiclePlateGenerateSG.generatePlate(); //使用Future和Callable組合,可以獲取到返回值 Future<String> result = exec.submit(new GenerateVehicleLog(date, random, errorRoadIdList, vehiclePlate, roadIdArray)); try { contentSb.append(result.get()); if(i % 100 == 0){ System.out.println(i); } if(i != 0 && i % 900 == 0){ long end = System.currentTimeMillis(); System.out.println(i + " sleeping 1 seconds." + " " + (end - begin)/1000 + " s"); //waiting the pre-task to finish. TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } //System.out.println(contentSb.toString()); } exec.shutdown(); saveData(Common.VEHICLE_LOG, contentSb.toString()); } private void generateErrorRoadIdList(Random random, List<Integer> errorRoadIdList) { for (int x = 0; x < Common.ROAD_ERROR_NUM; x++) { if (errorRoadIdList.contains(roadIdArray[random.nextInt(roadIdArray.length)])) { generateErrorRoadIdList(random, errorRoadIdList); } else { errorRoadIdList.add(roadIdArray[random.nextInt(roadIdArray.length)]); } } } } class GenerateVehicleLog implements Callable<String>{ StringBuffer contentSb; String date; Random random; List<Integer> errorRoadIdList; String vehiclePlate; Integer[] roadIdArray; public GenerateVehicleLog( String date, Random random, List<Integer> errorRoadIdList, String vehiclePlate, Integer[] roadIdArray){ this.contentSb = new StringBuffer(); this.date = date; this.random = random; this.errorRoadIdList = errorRoadIdList; this.vehiclePlate = vehiclePlate; this.roadIdArray = roadIdArray; } @Override public String call() throws Exception { return getVehicleLog(contentSb, date, random, errorRoadIdList, vehiclePlate, roadIdArray); } private String getVehicleLog(StringBuffer contentSb, String date, Random random, List<Integer> errorRoadIdList, String vehiclePlate, Integer[] roadIdArray) { // 每一輛車產生記錄在100條記錄以內 // 即最多過100個監控點 // 即最多過50條路 // 這裡可以根據需要調節 for (int n = 0; n < random.nextInt(Common.ROAD_NUM); n++) { int roadId = roadIdArray[random.nextInt(roadIdArray.length)]; Integer[] monitorIdArray = new Integer[2]; Integer[] cameraIdArray = new Integer[2]; boolean isAllError = false; int monitorId = 0; if (errorRoadIdList.contains(roadId)) { // System.out.println("find error road.... " + roadId + // " for vehicle : " + vehiclePlate); if (roadId % 2 == 0) { // 監控設備全部壞掉 isAllError = true; } else { // 部分壞掉 monitorIdArray[0] = roadId * 2 - 1; monitorIdArray[1] = roadId * 2 - 1; monitorId = monitorIdArray[random.nextInt(monitorIdArray.length)]; cameraIdArray[0] = roadId * 4 - 3; cameraIdArray[1] = roadId * 4 - 2; } } else { monitorIdArray[0] = roadId * 2 - 1; monitorIdArray[1] = roadId * 2; monitorId = monitorIdArray[random.nextInt(monitorIdArray.length)]; cameraIdArray[0] = monitorId * 2 - 1; cameraIdArray[1] = monitorId * 2; } if (!isAllError) { // 16/01/2019 10:20:30 SCN89000J 124 10002 20004 40007 contentSb.append(date).append(Common.BLANK).append(StringUtils.fulfuill(String.valueOf(random.nextInt(25)))).append(Common.COLON).append(StringUtils.fulfuill(String.valueOf(random.nextInt(61)))).append(Common.COLON).append(StringUtils.fulfuill(String.valueOf(random.nextInt(61)))).append(Common.SEPARATOR); contentSb.append(vehiclePlate).append(Common.SEPARATOR); contentSb.append((random.nextInt(Common.MAX_SPEED) + 1)).append(Common.SEPARATOR); contentSb.append(roadId).append(Common.SEPARATOR); contentSb.append(monitorId).append(Common.SEPARATOR); contentSb.append(cameraIdArray[random.nextInt(cameraIdArray.length)]).append(Common.LINE_BREAK); } } return contentSb.toString(); } } class FileUtils implements Serializable { private static final long serialVersionUID = 1L; public static boolean createFile(String pathFileName) { try { File file = new File(pathFileName); if (file.exists()) { System.err.println("Find file" + pathFileName + ", system will delete it now!!!"); file.delete(); } boolean createNewFile = file.createNewFile(); System.err.println("create file " + pathFileName + " success!"); return createNewFile; } catch (IOException e) { e.printStackTrace(); } return false; } public static void saveFile(String pathFileName, String newContent) { FileOutputStream fos = null; OutputStreamWriter osw = null; PrintWriter pw = null; try { String content = newContent; File file = new File(pathFileName); fos = new FileOutputStream(file, true); osw = new OutputStreamWriter(fos, Common.CHARSETNAME_UTF_8); pw = new PrintWriter(osw); pw.write(content); pw.close(); osw.close(); fos.close(); } catch (IOException e) { e.printStackTrace(); } finally { if (pw != null) { pw.close(); } if (osw != null) { try { osw.close(); } catch (IOException e) { e.printStackTrace(); } } if (fos != null) { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } } } } class StringUtils { public static String fulfuill(String str) { if (str.length() == 1) { return Common.ZERO + str; } return str; } } /** * From wiki: * https://en.wikipedia.org/wiki/Vehicle_registration_plates_of_Singapore<br> * * A typical vehicle registration number comes in the format "SKV 6201 B":<br> * * <li>S – Vehicle class ("S", with some exceptions, stands for a private * vehicle since 1984)</li><br> * <li>KV – Alphabetical series ("I" and "O" are not used to avoid confusion * with "1" and "0")</li><br> * <li>6201 – Numerical series</li><br> * <li>B – Checksum letter ("F","I", "N", "O", "Q", "V" and "W" are never used * as checksum letters; absent on special government vehicle plates and events * vehicle plates)</li> * */ class VehiclePlateGenerateSG implements Serializable { private static final long serialVersionUID = -8006144823705880339L; public static Random random = new Random(); // 主要目的就是使得產生的數字字元不在這個list裡面 // 如果在這個list裡面找到,那麼需要重新產生 private static List<String> uniqueList = new ArrayList<String>(); public static String generatePlate() { // System.out.println(ALPHABETICAL_ARRAY[18]);// S String alphabeticalSeries = getAlphabeticalStr(Common.ALPHABETICAL_ARRAY, random) + getAlphabeticalStr(Common.ALPHABETICAL_ARRAY, random); // System.out.println(alphabeticalSeries);//KV String numbericalSeries = getNumericalSeriesStr(random); // System.out.println(numbericalSeries);//62010 String checksumLetter = getChecksumLetterStr(Common.ALPHABETICAL_ARRAY, random); // System.out.println(checksumLetter);//B String singaporeVehiclePlate = Common.ALPHABETICAL_ARRAY[18] + alphabeticalSeries + numbericalSeries + checksumLetter; return singaporeVehiclePlate; } private static String getAlphabeticalStr(String[] ALPHABETICAL_ARRAY, Random random) { String alphabeticalStr = Common.ALPHABETICAL_ARRAY[random.nextInt(Common.ALPHABETICAL_ARRAY.length)]; // "I", "O" if (!(alphabeticalStr.equals(Common.ALPHABETICAL_ARRAY[8]) || alphabeticalStr.equals(Common.ALPHABETICAL_ARRAY[14]))) { return alphabeticalStr; } else { return getAlphabeticalStr(Common.ALPHABETICAL_ARRAY, random); } } private static String getNumericalSeriesStr(Random random) { // 為了區別真實的車牌,我們把數字設置為5位 String numericalStr = random.nextInt(10) + "" + random.nextInt(10) + "" + random.nextInt(10) + "" + random.nextInt(10) + "" + random.nextInt(10); if (uniqueList.contains(numericalStr)) { // 如果存在,則重新產生 return getNumericalSeriesStr(random); } else { uniqueList.add(numericalStr); return numericalStr; } } private static String getChecksumLetterStr(String[] ALPHABETICAL_ARRAY, Random random) { String checksumLetter = ALPHABETICAL_ARRAY[random.nextInt(ALPHABETICAL_ARRAY.length)]; // "F","I", "N", "O", "Q", "V" and "W" if (!(checksumLetter.equals(Common.ALPHABETICAL_ARRAY[5]) || checksumLetter.equals(Common.ALPHABETICAL_ARRAY[8]) || checksumLetter.equals(Common.ALPHABETICAL_ARRAY[13]) || checksumLetter.equals(Common.ALPHABETICAL_ARRAY[14]) || checksumLetter.equals(Common.ALPHABETICAL_ARRAY[16]) || checksumLetter.equals(Common.ALPHABETICAL_ARRAY[21]) || checksumLetter.equals(Common.ALPHABETICAL_ARRAY[22]))) { return checksumLetter; } else { return getChecksumLetterStr(ALPHABETICAL_ARRAY, random); } } } class Common implements Serializable { private static final long serialVersionUID = 1L; public static String VEHICLE_LOG = "./vehicle_log"; public static String ROAD_MONITOR_CAMERA_RELATIONSHIP = "./road_monitor_camera_relationship"; public static final String[] ALPHABETICAL_ARRAY = new String[] { "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z" }; public static final String DATE_FORMAT_YYYYMMDD = "dd/MM/yyyy"; public static final String CHARSETNAME_UTF_8 = "UTF-8"; public static final String SEPARATOR = "\t"; public static final String LINE_BREAK = "\n"; public static final String BLANK = " "; public static final String COLON = ":"; public static final String ZERO = "0"; //車輛數 public static final int VEHICLE_NUMBER = 100000; //道路數 public static final int ROAD_NUM = 400; public static final int ROAD_ERROR_NUM = 8; //最大車速 public static final int MAX_SPEED = 300; }
2.1.編譯執行
:wq javac DataGenerate.java java DataGenerate --運行完成,會生成下麵兩個文件 /root/vehicle_dir/vehicle_log /root/vehicle_dir/road_monitor_camera_relationship
2.2.車輛記錄log樣本
16/01/2019 14:06:44 SVM35185L 258 10295 20590 41179
16/01/2019 15:56:25 SVM35185L 110 10288 20575 41149
16/01/2019 02:22:29 SVM35185L 28 10109 20217 40436
16/01/2019 24:29:59 SSK43417H 254 10281 20562 41123
16/01/2019 07:36:54 SSK43417H 149 10124 20247 40495
16/01/2019 12:21:30 SSK43417H 196 10211 20421 40843
16/01/2019 12:42:43 SSK43417H 92 10308 20615 41230
16/01/2019 02:57:59 SDV20274X 206 10166 20332 40663
16/01/2019 11:60:17 SDV20274X 191 10372 20744 41488
16/01/2019 00:09:06 SDV20274X 197 10094 20188 40374
16/01/2019 21:18:30 SDV20274X 294 10101 20201 40401
16/01/2019 11:23:38 SDV20274X 74 10163 20325 40652
16/01/2019 04:35:16 SDV20274X 53 10077 20153 40305
16/01/2019 20:56:56 SDV20274X 31 10113 20226 40449
16/01/2019 16:50:11 SEN89218Y 58 10202 20404 40808
16/01/2019 18:34:47 SEN89218Y 113 10042 20083 40168
16/01/2019 02:25:52 SEN89218Y 35 10051 20101 40204
16/01/2019 24:08:52 SEN89218Y 77 10165 20330 40657
2.3.道路-監控-攝像頭關係樣本
10001 20001 40001
10001 20001 40002
10001 20002 40003
10001 20002 40004
10002 20003 40005
10002 20003 40006
10002 20004 40007
10002 20004 40008
10003 20005 40009
10003 20005 40010
10003 20006 40011
10003 20006 40012
10004 20007 40013
10004 20007 40014
10004 20008 40015
10004 20008 40016
3.在Hive中創建table並且導入數據
-- 創建table,並且把結果數據導入到Hive table裡面 cd /root/vehicle_dir/ vi hive_vehicle.sql --1.drop t_vehicle_log drop table IF EXISTS t_vehicle_log; --2.create t_vehicle_log CREATE TABLE t_vehicle_log( date_time string , vehicle_plate string , vehicle_speed int , road_id string , monitor_id string , camera_id string )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'; --3.load data into t_vehicle_log load data local inpath '/root/vehicle_dir/vehicle_log' into table t_vehicle_log; --4.drop t_road_monitor_camera_relationship drop table IF EXISTS t_road_monitor_camera_relationship; --5.create t_road_monitor_camera_relationship CREATE TABLE t_road_monitor_camera_relationship( road_id string , monitor_id string , camera_id string )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'; --6.load data into t_road_monitor_camera_relationship load data local inpath '/root/vehicle_dir/road_monitor_camera_relationship' into table t_road_monitor_camera_relationship; --7.drop t_monitor_camera drop table IF EXISTS t_monitor_camera; --8.create t_monitor_camera create table t_monitor_camera( monitor_id string , cameranum int, workingcameranum int, notWorkingCameraNum int )ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n'; --9.load data from other table into t_monitor_camera from (select monitor_id, count(distinct camera_id) cameraNum from t_road_monitor_camera_relationship group by monitor_id) t1 left outer join (select monitor_id, NVL(count(distinct camera_id), 0) workingCameraNum from t_vehicle_log group by monitor_id) t2 on t1.monitor_id=t2.monitor_id insert into table t_monitor_camera select t1.monitor_id, t1.cameraNum cameraNum, NVL(t2.workingCameraNum, 0) workingCameraNum,NVL((t1.cameraNum - NVL(t2.workingCameraNum, 0)), 0) notWorkingCameraNum;
4.編寫Sqoop配置文件
--配置sqoop:hive數據導入到mysql中 --註意: --export-dir /user/hive/warehouse/t_monitor_camera/ 這裡的地址可以在hive中, --通過desc formatted t_monitor_camera 查看 --Location: hdfs://mycluster/user/hive/warehouse/t_monitor_camera cd /root/vehicle_dir/ vi hive_to_mysql_for_vehicle export --connect jdbc:mysql://node1:3306/sqoop_db --username root --password '!QAZ2wsx3edc' --table t_hive_to_mysql_for_vehicle -m 1 --columns monitor_id,camera_num,working_camera_num,not_working_camera_num --fields-terminated-by '|' --export-dir /user/hive/warehouse/t_monitor_camera/ :wq
5.在Mysql中創建table
--在mysql裡面創建表 mysql -u root -p !QAZ2wsx3edc use sqoop_db; --如果有則刪除 DROP TABLE IF EXISTS t_hive_to_mysql_for_vehicle; CREATE TABLE t_hive_to_mysql_for_vehicle (monitor_id VARCHAR(5), camera_num INT, working_camera_num INT, not_working_camera_num INT);
6.編寫自動化可執行腳本
--編輯可執行腳本 cd /root/vehicle_dir/ vi hive_to_mysql_vehicle.sh echo 'job begin' cd /home/hive/bin ./hive -f /root/vehicle_dir/hive_vehicle.sql echo 'begin to inport to mysql' sqoop --options-file /root/vehicle_dir/hive_to_mysql_for_vehicle echo 'done.' :wq
7.賦予腳本可執行屬性
--賦予腳本可執行屬性 chmod +x hive_to_mysql_vehicle.sh
8.執行腳本
--執行腳本 ./hive_to_mysql_vehicle.sh
9.結果
9.1.執行腳本前,檢查mysql table
--執行腳本之前,查詢t_hive_to_mysql_for_vehicle mysql> select * from t_hive_to_mysql_for_vehicle; Empty set (0.00 sec)
9.2.執行腳本
[root@node1 vehicle_dir]# ./hive_to_mysql_vehicle.sh job begin 19/01/16 01:00:53 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. Use hive.hmshandler.retry.* instead Logging initialized using configuration in jar:file:/root/apache-hive-0.13.1-bin/lib/hive-common-0.13.1.jar!/hive-log4j.properties SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/root/hadoop-2.5.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/root/apache-hive-0.13.1-bin/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] OK Time taken: 1.432 seconds OK Time taken: 0.306 seconds Copying data from file:/root/vehicle_dir/vehicle_log Copying file: file:/root/vehicle_dir/vehicle_log Loading data to table default.t_vehicle_log Table default.t_vehicle_log stats: [numFiles=1, numRows=0, totalSize=121817379, rawDataSize=0] OK Time taken: 5.958 seconds OK Time taken: 0.101 seconds OK Time taken: 0.042 seconds Copying data from file:/root/vehicle_dir/road_monitor_camera_relationship Copying file: file:/root/vehicle_dir/road_monitor_camera_relationship Loading data to table default.t_road_monitor_camera_relationship Table default.t_road_monitor_camera_relationship stats: [numFiles=1, numRows=0, totalSize=28799, rawDataSize=0] OK Time taken: 0.637 seconds OK Time taken: 0.094 seconds OK Time taken: 0.071 seconds Total jobs = 4 Launching Job 1 out of 4 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1547622676146_0025, Tracking URL = http://node1:8088/proxy/application_1547622676146_0025/ Kill Command = /home/hadoop-2.5/bin/hadoop job -kill job_1547622676146_0025 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2019-01-16 01:01:29,337 Stage-1 map = 0%, reduce = 0% 2019-01-16 01:01:40,583 Stage-1 map