使用pyspark模仿sqoop從oracle導數據到hive的主要功能(自動建表,分區導入,增量,解決數據換行符問題)

来源:https://www.cnblogs.com/yzj-blog/archive/2018/07/31/9393297.html
-Advertisement-
Play Games

使用pyspark 在hive中建表,分區導入,增量,解決數據換行符問題彙總 ...


  最近公司開始做大數據項目,讓我使用sqoop(1.6.4版本)導數據進行數據分析計算,然而當我們將所有的工作流都放到azkaban上時整個流程跑完需要花費13分鐘,而其中導數據(增量)就占了4分鐘左右,老闆給我提供了使用 spark 導數據的思路,學習整理了一個多星期,終於實現了sqoop的主要功能。

  這裡我使用的是pyspark完成的所有操作。

  

  條件:hdfs平臺,pyspark,ubuntu系統

  運行:我這裡是在 /usr/bin 目錄下(或者指定在此目錄下 )運行的python文件,也可以使用系統自帶的pyspark

1 ./spark-submit --jars "/home/engyne/spark/ojdbc7.jar" --master local  /home/engyne/spark/SparkDataBase.py

  其中--jars 是指定連接oracle的驅動,ojdbc7.jar對應的是oracle12版本,--master local /...指定的是運行的python文件

  註意:我的代碼沒有解決中文問題,所以不管是註釋還是代碼中都不能出現中文,記得刪除!!!

 

  1、pyspark連接oracle,導數據到hive(後面的代碼需要在此篇代碼基礎上進行,重覆代碼不再copy了)

 1 import sys
 2 from pyspark.sql import HiveContext
 3 from pyspark import SparkConf, SparkContext, SQLContext
 4 
 5 conf = SparkConf().setAppName('inc_dd_openings')
 6 sc = SparkContext(conf=conf)
 7 sqlContext = HiveContext(sc)
 8 
 9 #以下是為了在console中列印出表內容
10 reload(sys)
11 sys.setdefaultencoding("utf-8")
12 
13 get_df_url = "jdbc:oracle:thin:@//192.168.1.1:1521/ORCLPDB"
14 get_df_driver = "oracle.jdbc.driver.OracleDriver"
15 get_df_user = "xxx"
16 get_df_password = "xxx"
17  
18 df = sqlContext.read.format("jdbc") \
19     .option("url", get_df_url) \
20     .option("driver", get_df_driver) \
21     .option("dbtable", "STUDENT") \
22     .option("user",  get_df_user).option("password", get_df_password) \
23     .load()
24 #df.show() #可以查看到獲取的表的內容,預設顯示20行
25 sqlContext.sql("use databaseName")    #databaseName指定使用hive中的資料庫
26 #創建臨時表
27 df.registerTempTable("tempTable")
28 #創建表並寫入數據
29 sqlContext.sql("create table STUDENT as select * from tempTable")

  2、pyspark在hive中創建動態分區表

1 #修改一下hive的預設設置以支持動態分區
2 sqlContext.sql("set hive.exec.dynamic.partition=true")
3 sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")
4 #設置hive支持創建分區文件的最大值
5 sqlContext.sql("SET hive.exec.max.dynamic.partitions=100000")
6 sqlContext.sql("SET hive.exec.max.dynamic.partitions.pernode=100000")

  這裡需要先手動創建分區表,我使用dataframe的dtypes屬性獲取到表結構,然後迴圈拼接表的每個欄位在hive中所對應的類型

  最後寫入表數據的代碼是:

1 sqlContext.sql("insert overwrite table STUDENT partition(AGE) SELECT ID,NAME,UPDATETIME,AGE FROM tempTable"

   3、實現增量導入數據

  我這裡使用了MySql資料庫,用來存儲增量導入的信息,創建表(job)

DROP TABLE IF EXISTS `job`;

CREATE TABLE `job` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `database_name` varchar(50) DEFAULT NULL,     --資料庫名稱
  `table_name` varchar(100) DEFAULT NULL,       --需要增量導入的表名
  `partition_column_name` varchar(100) DEFAULT NULL,        --分區的欄位名(這裡只考慮對一個欄位分區,如果多個欄位這裡應該使用一對多表結構吧)
  `partition_column_desc` varchar(50) DEFAULT NULL,     --分區欄位類型
  `check_column` varchar(50) DEFAULT NULL,      --根據(table_name中)此欄位進行增量導入校驗(我這裡例子使用的是updatetime)
  `last_value` varchar(255) DEFAULT NULL,       --校驗值
  `status` int(1) NOT NULL,     --是否使用(1表示此job激活)
  PRIMARY KEY (`id`)
) INCREMENTAL=InnoDB AUTO_INCREMENT=81 DEFAULT CHARSET=utf8;

  存儲STUDENT表增量導入信息(這裡是為了演示)

insert  into `job`(`id`,`database_name`,`table_name`,`partition_column_name`,`partition_column_desc`,`check_column`,`last_value`,`status`)values (1,'test_datebase','STUDENT','AGE','string','UPDATETIME','2018-07-30',1)

  python 連接MySql的方法我這裡就直接懟代碼了,具體詳解大家就看菜鳥教程

  Ubuntu需要安裝MySQLdb(   sudo apt-get install python-mysqldb   )

import MySQLdb

# insert        update        delete
def conMysqlDB_exec(sqlStr):
    db = MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8' )
    cursor = db.cursor()
    try:
        cursor.execute(sqlStr)
        db.commit()
        result = True
    except:
        print("---->MySqlError: execute error")
        result = False
        db.rollback()
    db.close
    return result

# select
def conMysqlDB_fetchall(sqlStr):
    db = MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8' )
    cursor = db.cursor()
    results = []
    try:
        cursor.execute(sqlStr)
        results = cursor.fetchall()
    except:
        print("---->MySqlError: unable to fecth data")
    db.close
    return results

  查詢增量信息,使用spark進行導入

findJobSql = "SELECT * FROM job where status=1"
result
= conMysqlDB_fetchall(findJobSql) databaseName = val[1] tableName = val[2] partitionColumnName = val[3] partitionColumnDesc = val[4] checkColumn = val[5] lastValue = val[6] sqlContext.sql("use database") df = sqlContext.read.format("jdbc") \ .option("url", "jdbc:oracle:thin:@//192.168.xxx.xxx:1521/ORCLPDB") \ .option("driver", "oracle.jdbc.driver.OracleDriver") \ .option("dbtable", "(select * from %s where to_char(%s, 'yyyy-MM-dd')>'%s')" % (tableName, checkColumn, lastValue)) \ #這裡是關鍵,直接查詢出新增的數據,這樣後面的速度才能提升,否則要對整個表的dataframe進行操作,慢死了,千萬不要相信dataframe的filter,where這些東西,4萬多條數據要查3分鐘!!! .option("user", "xxx").option("password", "xxx") \ .load()
def  max(a, b):
    if a>b:
      return a
    else:
      return b
try: #獲取到新增欄位的最大值!!!(這塊也困了我好久)這裡使用的是python的reduce函數,調用的max方法 nowLastValue = df.rdd.reduce(max)[checkColumn]
    df.registerTempTable("temp")#寫入內容
    saveSql = "insert into table student select * from temp"
    sqlContext.sql(saveSql)
    #更新mysql表,使lastValue是表最新值
    updataJobSql = "UPDATE job SET last_value='%s' WHERE table_name='%s'" % (nowLastValue, tableName)
    if conMysqlDB_exec(updataJobSql):
        print("---->SUCCESS: incremental import success")
except ValueError:
    print("---->INFO: No new data added!")
except:
    print("---->ERROR: other error")

  4、解決導入數據換行符問題

  有時候oracle中的數據中會存在換行符(" \n ")然而hive1.1.0中數據換行預設識別的也是\n,最坑的是還不能對它進行修改(目前我沒有查出修改的方法,大家要是有辦法歡迎在評論區討論)那我只能對數據進行處理了,以前使用sqoop的時候也有這個問題,所幸sqoop有解決換行符的語句,,,,巴拉巴拉,,,扯遠了

  解決換行符需要dataframe的map方法,然後使用lambda表達式進行replace,總結好就是下麵的代碼(第3行)

  解釋:這是個for迴圈裡面加if else 判斷,整個需要用  [ ]  包起來,沒錯這是個list ,如果不包就報錯,lambda x 獲取到的是表中一行行的數據,for迴圈對每一行進行遍歷,然後對一行中每個欄位進行判斷,是否是unicode或者str類型,(一般只有是這兩個類型才存在換行符)如果是則進行replace處理,否則不做處理。

  轉化好之後這是個rdd類型的數據,需要轉化為dataframe類型才能寫入hive

1 #df自帶獲取schema的方法,不要學我去拼湊出來(

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 背景:MySQL5.6.40,庫比較小,row+gtid複製環境,但由於以前種種原因,備份還原在從庫後,開啟複製存在大量1062,1032錯誤,gtid卡在靠前位置。做複製的時候沒有任何從庫,每小時的備份也被運維停了。 以前從來沒遇到過這種情況,相對測試環境正式環境比較複雜,而且猜測可能是之前備份還 ...
  • *多表查詢 分類:1.合併結果集 2.連接查詢 3.子查詢 *合併結果集:要求被合併的表中,列的類型和列數相同。 *UNION,去除重覆行。完全相同的行會被去除 *UNION ALL:不去除重覆行。 例:select * from ab UNION ALL select * from cd; *連接 ...
  • Flink的部署 環境準備:windows7系統,本地連接。如果打開更改適配器設置後沒有本地連接,可以通過驅動精靈等軟體安裝網卡驅動。為了使部署在虛擬機上的伺服器可以與物理機進行連通,必須使物理機的網卡和虛擬機上伺服器的網卡在同一個網段上,在此我們規定一個網段192.168.0.*為標準,我的物理機 ...
  • 配置免安裝mysql 1) 解壓mysql包,放置自定義目錄,我這裡演示的是D:\mysql 2) 將根目錄下的my-default.ini,改名為my.ini 3) 複製下麵內容到my.ini 註意路徑 4) 環境變數配置 我的電腦-屬性-高級-環境變數-新建 變數mysql_home 值D:\m ...
  • 第一章: entity 實體 relationship 關係 diagram 圖表 model 模型 normal 規範的 formate 形式 hotel 旅館 guest 客人 promation 提升 推廣 state 狀態 type 類型 第二章: networking 網路 option ...
  • 老樣子,不多BiBi,直接進入主題! 有時候在linux下編譯好QT程式,用QTCreator運行沒問題,但是用命令./XX就會報錯:error while loading shared libraries:等等問題,有同學可能會問我的依賴庫已經放在可執行文件同目錄下了,怎麼會找不到呢,這裡需要 1 ...
  • 存儲過程是一組為了完成特定功能的sql語句集,存儲在資料庫中,經過一次編譯後再次調用不需要編譯。用戶通過指定存儲過程的名字來執行它。 基本語法: create or replace procedure procedure_01 is//一直糾結這裡是is還是as,查資料後發現:在存儲過程(proce ...
  • 小編帶大家來分析一下,零基礎入門學習大數據可以從事哪些工作呢? 2018年隨著當代信息技術的迅猛發展,大數據在人們的工作、生產、生活、學習、娛樂等方面,人們想開始學習大數據的時候,最常問我的問題是,“我應該學Hadoop(hadoop是一款開源軟體,主要用於分散式存儲和計算,他由HDFS和MapRe ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...