使用pyspark 在hive中建表,分區導入,增量,解決數據換行符問題彙總 ...
最近公司開始做大數據項目,讓我使用sqoop(1.6.4版本)導數據進行數據分析計算,然而當我們將所有的工作流都放到azkaban上時整個流程跑完需要花費13分鐘,而其中導數據(增量)就占了4分鐘左右,老闆給我提供了使用 spark 導數據的思路,學習整理了一個多星期,終於實現了sqoop的主要功能。
這裡我使用的是pyspark完成的所有操作。
條件:hdfs平臺,pyspark,ubuntu系統
運行:我這裡是在 /usr/bin 目錄下(或者指定在此目錄下 )運行的python文件,也可以使用系統自帶的pyspark
1 ./spark-submit --jars "/home/engyne/spark/ojdbc7.jar" --master local /home/engyne/spark/SparkDataBase.py
其中--jars 是指定連接oracle的驅動,ojdbc7.jar對應的是oracle12版本,--master local /...指定的是運行的python文件
註意:我的代碼沒有解決中文問題,所以不管是註釋還是代碼中都不能出現中文,記得刪除!!!
1、pyspark連接oracle,導數據到hive(後面的代碼需要在此篇代碼基礎上進行,重覆代碼不再copy了)
1 import sys 2 from pyspark.sql import HiveContext 3 from pyspark import SparkConf, SparkContext, SQLContext 4 5 conf = SparkConf().setAppName('inc_dd_openings') 6 sc = SparkContext(conf=conf) 7 sqlContext = HiveContext(sc) 8 9 #以下是為了在console中列印出表內容 10 reload(sys) 11 sys.setdefaultencoding("utf-8") 12 13 get_df_url = "jdbc:oracle:thin:@//192.168.1.1:1521/ORCLPDB" 14 get_df_driver = "oracle.jdbc.driver.OracleDriver" 15 get_df_user = "xxx" 16 get_df_password = "xxx" 17 18 df = sqlContext.read.format("jdbc") \ 19 .option("url", get_df_url) \ 20 .option("driver", get_df_driver) \ 21 .option("dbtable", "STUDENT") \ 22 .option("user", get_df_user).option("password", get_df_password) \ 23 .load() 24 #df.show() #可以查看到獲取的表的內容,預設顯示20行 25 sqlContext.sql("use databaseName") #databaseName指定使用hive中的資料庫 26 #創建臨時表 27 df.registerTempTable("tempTable") 28 #創建表並寫入數據 29 sqlContext.sql("create table STUDENT as select * from tempTable")
2、pyspark在hive中創建動態分區表
1 #修改一下hive的預設設置以支持動態分區 2 sqlContext.sql("set hive.exec.dynamic.partition=true") 3 sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict") 4 #設置hive支持創建分區文件的最大值 5 sqlContext.sql("SET hive.exec.max.dynamic.partitions=100000") 6 sqlContext.sql("SET hive.exec.max.dynamic.partitions.pernode=100000")
這裡需要先手動創建分區表,我使用dataframe的dtypes屬性獲取到表結構,然後迴圈拼接表的每個欄位在hive中所對應的類型
最後寫入表數據的代碼是:
1 sqlContext.sql("insert overwrite table STUDENT partition(AGE) SELECT ID,NAME,UPDATETIME,AGE FROM tempTable")
3、實現增量導入數據
我這裡使用了MySql資料庫,用來存儲增量導入的信息,創建表(job)
DROP TABLE IF EXISTS `job`; CREATE TABLE `job` ( `id` int(10) NOT NULL AUTO_INCREMENT, `database_name` varchar(50) DEFAULT NULL, --資料庫名稱 `table_name` varchar(100) DEFAULT NULL, --需要增量導入的表名 `partition_column_name` varchar(100) DEFAULT NULL, --分區的欄位名(這裡只考慮對一個欄位分區,如果多個欄位這裡應該使用一對多表結構吧) `partition_column_desc` varchar(50) DEFAULT NULL, --分區欄位類型 `check_column` varchar(50) DEFAULT NULL, --根據(table_name中)此欄位進行增量導入校驗(我這裡例子使用的是updatetime) `last_value` varchar(255) DEFAULT NULL, --校驗值 `status` int(1) NOT NULL, --是否使用(1表示此job激活) PRIMARY KEY (`id`) ) INCREMENTAL=InnoDB AUTO_INCREMENT=81 DEFAULT CHARSET=utf8;
存儲STUDENT表增量導入信息(這裡是為了演示)
insert into `job`(`id`,`database_name`,`table_name`,`partition_column_name`,`partition_column_desc`,`check_column`,`last_value`,`status`)values (1,'test_datebase','STUDENT','AGE','string','UPDATETIME','2018-07-30',1)
python 連接MySql的方法我這裡就直接懟代碼了,具體詳解大家就看菜鳥教程
Ubuntu需要安裝MySQLdb( sudo apt-get install python-mysqldb )
import MySQLdb # insert update delete def conMysqlDB_exec(sqlStr): db = MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8' ) cursor = db.cursor() try: cursor.execute(sqlStr) db.commit() result = True except: print("---->MySqlError: execute error") result = False db.rollback() db.close return result # select def conMysqlDB_fetchall(sqlStr): db = MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8' ) cursor = db.cursor() results = [] try: cursor.execute(sqlStr) results = cursor.fetchall() except: print("---->MySqlError: unable to fecth data") db.close return results
查詢增量信息,使用spark進行導入
findJobSql = "SELECT * FROM job where status=1"
result = conMysqlDB_fetchall(findJobSql) databaseName = val[1] tableName = val[2] partitionColumnName = val[3] partitionColumnDesc = val[4] checkColumn = val[5] lastValue = val[6] sqlContext.sql("use database") df = sqlContext.read.format("jdbc") \ .option("url", "jdbc:oracle:thin:@//192.168.xxx.xxx:1521/ORCLPDB") \ .option("driver", "oracle.jdbc.driver.OracleDriver") \ .option("dbtable", "(select * from %s where to_char(%s, 'yyyy-MM-dd')>'%s')" % (tableName, checkColumn, lastValue)) \ #這裡是關鍵,直接查詢出新增的數據,這樣後面的速度才能提升,否則要對整個表的dataframe進行操作,慢死了,千萬不要相信dataframe的filter,where這些東西,4萬多條數據要查3分鐘!!! .option("user", "xxx").option("password", "xxx") \ .load()
def max(a, b):
if a>b:
return a
else:
return b
try: #獲取到新增欄位的最大值!!!(這塊也困了我好久)這裡使用的是python的reduce函數,調用的max方法 nowLastValue = df.rdd.reduce(max)[checkColumn]
df.registerTempTable("temp")#寫入內容 saveSql = "insert into table student select * from temp" sqlContext.sql(saveSql) #更新mysql表,使lastValue是表最新值 updataJobSql = "UPDATE job SET last_value='%s' WHERE table_name='%s'" % (nowLastValue, tableName) if conMysqlDB_exec(updataJobSql): print("---->SUCCESS: incremental import success") except ValueError: print("---->INFO: No new data added!") except: print("---->ERROR: other error")
4、解決導入數據換行符問題
有時候oracle中的數據中會存在換行符(" \n ")然而hive1.1.0中數據換行預設識別的也是\n,最坑的是還不能對它進行修改(目前我沒有查出修改的方法,大家要是有辦法歡迎在評論區討論)那我只能對數據進行處理了,以前使用sqoop的時候也有這個問題,所幸sqoop有解決換行符的語句,,,,巴拉巴拉,,,扯遠了
解決換行符需要dataframe的map方法,然後使用lambda表達式進行replace,總結好就是下麵的代碼(第3行)
解釋:這是個for迴圈裡面加if else 判斷,整個需要用 [ ] 包起來,沒錯這是個list ,如果不包就報錯,lambda x 獲取到的是表中一行行的數據,for迴圈對每一行進行遍歷,然後對一行中每個欄位進行判斷,是否是unicode或者str類型,(一般只有是這兩個類型才存在換行符)如果是則進行replace處理,否則不做處理。
轉化好之後這是個rdd類型的數據,需要轉化為dataframe類型才能寫入hive
1 #df自帶獲取schema的方法,不要學我去拼湊出來(