閱讀目錄 一、Hive內部表和外部表 1、Hive的create創建表的時候,選擇的創建方式: - create table - create external table 2、特點: ● 在導入數據到外部表,數據並沒有移動到自己的數據倉庫目錄下,也就是說外部表中的數據並不是由它自己來管理的!而表則 ...
閱讀目錄
一、Hive內部表和外部表
1、Hive的create創建表的時候,選擇的創建方式:
- create table
- create external table
2、特點:
● 在導入數據到外部表,數據並沒有移動到自己的數據倉庫目錄下,也就是說外部表中的數據並不是由它自己來管理的!而表則不一樣;
● 在刪除表的時候,Hive將會把屬於表的元數據和數據全部刪掉;而刪除外部表的時候,Hive僅僅刪除外部表的元數據,數據是不會刪除的!
註意:
1、- create table 創建內部表,create external table 創建外部表
2、建議在工作中用外部表來創建
二、Hive中的Partition
● 在Hive中,表中的一個Partition對應於表下的一個目錄,所有的Partition的數據都儲存在對應的目錄中
– 例如:pvs 表中包含 ds 和 city 兩個 Partition,則 – 對應於 ds = 20090801, ctry = US 的 HDFS 子目錄為:/wh/pvs/ds=20090801/ctry=US; – 對應於 ds = 20090801, ctry = CA 的 HDFS 子目錄為;/wh/pvs/ds=20090801/ctry=CA
● Partition是輔助查詢,縮小查詢範圍,加快數據的檢索速度和對數據按照一定的規格和條件進行管理。
三、Hive中的 Bucket
• hive中table可以拆分成partition,table和partition可以通過‘CLUSTERED BY ’進一步分bucket,bucket中的數據可以通過‘SORT BY’排序。 • 'set hive.enforce.bucketing = true' 可以自動控制上一輪reduce的數量從而適 配bucket的個數,當然,用戶也可以自主設置mapred.reduce.tasks去適配 bucket個數
• Bucket主要作用:
– 數據sampling,隨機採樣
– 提升某些查詢操作效率,例如mapside join
• 查看sampling數據: – hive> select * from student tablesample(bucket 1 out of 2 on id); – tablesample是抽樣語句,語法:TABLESAMPLE(BUCKET x OUT OF y) – y必須是table總bucket數的倍數或者因數。hive根據y的大小,決定抽樣的比例。例如,table總共分了64份,當y=32 時,抽取(64/32=)2個bucket的數據,當y=128時,抽取(64/128=)1/2個bucket的數據。x表示從哪個bucket開始抽 取。例如,table總bucket數為32,tablesample(bucket 3 out of 16),表示總共抽取(32/16=)2個bucket的數據 ,分別為第3個bucket和第(3+16=)19個bucket的數據
四、Hive數據類型
1、原生類型
– 原生類型 • TINYINT • SMALLINT • INT • BIGINT • BOOLEAN • FLOAT • DOUBLE • STRING • BINARY(Hive 0.8.0以上才可用) • TIMESTAMP(Hive 0.8.0以上才可用)
2、複合類型
– 複合類型 • Arrays:ARRAY<data_type> • Maps:MAP<primitive_type, data_type> ##複合類型 • Structs:STRUCT<col_name: data_type[COMMENT col_comment],……> • Union:UNIONTYPE<data_type, data_type,……>
五、Hive SQL — — Join in MR
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age
FROM page_view pv
JOIN user u
ON (pv.userid = u.userid);
SELECT pageid, age, count(1)
FROM pv_users
GROUP BY pageid, age;
六、Hive的優化
• Map的優化:
– 作業會通過input的目錄產生一個或者多個map任務。set dfs.block.size
– Map越多越好嗎?是不是保證每個map處理接近文件塊的大小?
– 如何合併小文件,減少map數?
set mapred.max.split.size=100000000; #100M set mapred.min.split.size.per.node=100000000; set mapred.min.split.size.per.rack=100000000; set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
– 如何適當的增加map數?
set mapred.map.tasks=10;
– Map端聚合 hive.map.aggr=true 。 Mr中的Combiners.
• Reduce的優化:
• Reduce的優化: – hive.exec.reducers.bytes.per.reducer;reduce任務處理的數據量
– 調整reduce的個數: • 設置reduce處理的數據量 • set mapred.reduce.tasks=10
select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 寫成 select count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04';
Set mapred.reduce.tasks = 100 Create table a_standby_table as select * from a distribute by XXX
• 分區裁剪優化(partition):
– Where中的分區條件,會提前生效,不必特意做子查詢,直接Join和GroupBy
• 笛卡爾積:
– join的時候不加on條件或者無效的on條件,Hive只能使用1個reducer來完成笛卡爾積
• Map join:
– /*+ MAPJOIN(tablelist) */,必須是小表,不要超過1G,或者50萬條記錄
• Union all:
– 先做union all再做join或group by等操作可以有效減少MR過程,儘管是多個Select,最終只有一個
mr
Union:有去重操作,會消耗系統性能
Union all:沒有去重操作,
• Multi-insert & multi-group by:
– 從一份基礎表中按照不同的維度,一次組合出不同的數據 – FROM from_statement – INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1)] select_statement1 group by key1 – INSERT OVERWRITE TABLE tablename2 [PARTITION(partcol2=val2 )] select_statement2 group by key2
• Automatic merge:
– 當文件大小比閾值小時,hive會啟動一個mr進行合併 – hive.merge.mapfiles = true 是否和並 Map 輸出文件,預設為 True – hive.merge.mapredfiles = false 是否合併 Reduce 輸出文件,預設為 False – hive.merge.size.per.task = 256*1000*1000 合併文件的大小
• Multi-Count Distinct:
– 必須設置參數:set hive.groupby.skewindata=true; – select dt, count(distinct uniq_id), count(distinct ip) – from ods_log where dt=20170301 group by dt
• Hive的Join優化:
• 一個MR job
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (a.key = c.key1)
• 生成多個MR job
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
• Hive的Join優化----表連接順序
• 按照JOIN順序中的最後一個表應該儘量是大表,因為JOIN前一階段生成的數據會存在於
Reducer的buffer中,通過stream最後面的表,直接從Reducer的buffer中讀取已經緩衝的中間
結果數據(這個中間結果數據可能是JOIN順序中,前面表連接的結果的Key,數據量相對較小,
記憶體開銷就小),這樣,與後面的大表進行連接時,只需要從buffer中讀取緩存的Key,與大表
中的指定Key進行連接,速度會更快,也可能避免記憶體緩衝區溢出。
• 使用hint的方式啟發JOIN操作
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1); a表被視為大表
SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a JOIN b ON a.key = b.key; MAPJION會把小表全部讀入記憶體中,在map階段直接 拿另外一個表的數據和記憶體中表數據做匹配,由於在 map是進行了join操作,省去了reduce運行的效率也 會高很多.
• 左連接時,左表中出現的JOIN欄位都保留,右表沒有連接上的都為空
SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key) WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'
SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07')
• 執行順序是,首先完成2表JOIN,然後再通過WHERE條件進行過濾,這樣在JOIN過程中可能會
輸出大量結果,再對這些結果進行過濾,比較耗時。可以進行優化,將WHERE條件放在ON後
,在JOIN的過程中,就對不滿足條件的記錄進行了預先過濾。
• Hive的Join優化----並行執行
• 並行實行: – 同步執行hive的多個階段,hive在執行過程,將一個查詢轉化成一個或者多個階段。某個特 定的job可能包含眾多的階段,而這些階段可能並非完全相互依賴的,也就是說可以並行執行 的,這樣可能使得整個job的執行時間縮短。hive執行開啟:set hive.exec.parallel=true
• Hive的Join優化----數據傾斜
• 操作
• Join
• Group by
• Count Distinct
• 原因
• key分佈不均導致的
• 人為的建表疏忽
• 業務數據特點
• 癥狀
• 任務進度長時間維持在99%(或100%),查看任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。
• 查看未完成的子任務,可以看到本地讀寫數據量積累非常大,通常超過10GB可以認定為發生數據傾斜。
• 傾斜度
• 平均記錄數超過50w且最大記錄數是超過平均記錄數的4倍。
• 最長時長比平均時長超過4分鐘,且最大時長超過平均時長的2倍。
• 萬能方法
• hive.groupby.skewindata=true
• Hive的Join優化----數據傾斜----大小表關聯
• 原因
• Hive在進行join時,按照join的key進行分發,而在join左邊的表的數據會首先讀入記憶體,如果左邊表的key相對
分散,讀入記憶體的數據會比較小,join任務執行會比較快;而如果左邊的表key比較集中,而這張表的數據量很大,
那麼數據傾斜就會比較嚴重,而如果這張表是小表,則還是應該把這張表放在join左邊。
• 思路
• 將key相對分散,並且數據量小的表放在join的左邊,這樣可以有效減少記憶體溢出錯誤發生的幾率
• 使用map join讓小的維度表先進記憶體。
• 方法
• Small_table join big_table
• Hive的Join優化----數據傾斜----大大表關聯
• 原因 • 日誌中有一部分的userid是空或者是0的情況,導致在用user_id進行hash分桶的時候,會將日誌中userid為0或者 空的數據分到一起,導致了過大的斜率。 • 思路 • 把空值的key變成一個字元串加上隨機數,把傾斜的數據分到不同的reduce上,由於null值關聯不上,處理後並不 影響最終結果。 • 方法 • on case when (x.uid = '-' or x.uid = '0‘ or x.uid is null) then concat('dp_hive_search',rand()) else x.uid end = f.user_id
七、Hive的搭建
1、Mysql配置
• 預設情況下,Hive的元數據信息存儲在內置的Derby數據中。
• Hive支持將元數據存儲在MySQL中
• 元數據存儲配置:
– 【本地配置1】:預設
– 【本地配置2】:本地搭建mysql,通過localhost:Port方式訪問
– 【遠程配置】:遠程搭建mysql,通過IP:Port方式訪問
• 第一步:安裝MySQL伺服器端和MySQL客戶端,並啟動MySQL服務 • 安裝: – yum install mysql – yum install mysql-server • 啟動: – /etc/init.d/mysqld start • 設置用戶名和密碼: – mysqladmin -u root password '111111‘ • 測試登錄是否成功: – mysql -uroot -p111111
2、安裝Hive
①下載apache-hive-0.13.0-bin.tgz,並解壓:
②在conf目錄下,創建hive-site.xml配置文件:
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>111111</value> </property> </configuration>View Code
③ 修改profile,配置環境變數:
④將mysql-connector-java-5.1.41-bin.jar拷貝到hive home的lib目錄下,以支
持hive對mysql的操作
註意:
測試hive的前提得打開hadoop集群,start-all.sh
⑤hive在創建表的過程中,報如下錯誤處理:
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Got exception: org.apache.hadoop.ipc.RemoteException org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot create directory /user/hive/warehouse/w_a. Name node is in safe mode.
處理方法:
bin/hadoop dfsadmin -safemode leave
關閉Hadoop的安全模式
⑥測試hive
hive> create EXTERNAL TABLE w_a( usrid STRING, age STRING, sex STRING)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'; #創建表 OK Time taken: 0.309 seconds hive> show tables; OK w_a Time taken: 0.049 seconds, Fetched: 1 row(s) hive> drop table w_a; #刪除表 OK 導入數據: [root@master badou]# hive -f create_ex_table.sql #-f指定sql文件導入到hive中 Logging initialized using configuration in jar:file:/usr/local/src/apache-hive-0.13.0-bin/lib/hive-common-0.13.0.jar!/hive-log4j.properties OK Time taken: 1.168 seconds OK Time taken: 0.077 seconds 導入文本到hive: hive> LOAD DATA LOCAL INPATH '/home/ba/a.txt' OVERWRITE INTO TABLE w_a; #導入a.txt 文本到hive中 Copying data from file:/home/badou/a.txt Copying file: file:/home/badou/a.txt Failed with exception java.io.IOException: File /tmp/hive-root/hive_2019-04-28_05-20-33_879_4807090646149011006-1/-ext-10000/a.txt could only be replicated to 0 nodes, instead of 1
⑦
⑧
⑨
⑩
⑪
五、函數嵌套
def father(name): print('from father %s' %name) def son(): print('from the son') def grandson(): print('from the grandson') grandson() son() father('朱銳')
六、閉包
1、閉包
def father(name): print('from father %s' %name) def son(): print('from the son') def grandson(): print('from the grandson') grandson() son() father('朱銳') ''' 閉包 ''' def father(name): def son(): # name='simon1' print('我的爸爸是%s' %name) def grandson(): print('我的爺爺是%s' %name) grandson() son() father('simon')
2、函數閉包裝飾器基本實現
import time def timmer(func): def wrapper(): # print(func) start_time=time.time() func() #就是在運行test() stop_time=time.time() print('運行時間是%s' %(stop_time-start_time)) return wrapper @timmer #語法糖,這個是重點 def test(): time.sleep(3) print('test函數運行完畢') # res=timmer(test) #返回的是wrapper的地址 # res() #執行的是wrapper() # test=timmer(test) #返回的是wrapper的地址 # test() #執行的是wrapper() test() ''' 語法糖 ''' # @timmer #就相當於 test=timmer(test)
3、函數閉包加上返回值
#未加返回值 import time def timmer(func): def wrapper(): # print(func) start_time=time.time() func() #就是在運行test() stop_time=time.time() print('運行時間是%s' %(stop_time-start_time)) return 123 return wrapper @timmer #語法糖 def test(): time.sleep(3) print('test函數運行完畢') return '這是test的返回值' res=test() #就是在運行wrapper print(res) 運行結果如下: C:\Python35\python3.exe G:/python_s3/day20/加上返回值.py test函數運行完畢 運行時間是3.000171661376953 123
#加上返回值 import time def timmer(func): def wrapper(): # print(func) start_time=time.time() res=func() #就是在運行test() ##主要修改這裡1 stop_time=time.time() print('運行時間是%s' %(stop_time-start_time)) return res ##修改這裡2 return wrapper @timmer #語法糖 def test(): time.sleep(3) print('test函數運行完畢') return '這是test的返回值' res=test() #就是在運行wrapper print(res) 運行結果如下: C:\Python35\python3.exe G:/python_s3/day20/加上返回值.py test函數運行完畢 運行時間是3.000171661376953 這是test的返回值
4、函數閉包加上參數
import time def timmer(func): def wrapper(name,age): #加入參數,name,age # print(func) start_time=time.time() res=func(name,age) ##加入參數,name,age stop_time=time.time() print('運行時間是%s' %(stop_time-start_time)) return res return wrapper @timmer #語法糖 def test(name,age): #加入參數,name,age time.sleep(3) print('test函數運行完畢,名字是【%s】,年齡是【%s】' % (name,age)) return '這是test的返回值' res=test('simon',18) #就是在運行wrapper print(res)
使用可變長參數代碼如下:達到的效果是傳參靈活
import time def timmer(func): def wrapper(*args,**kwargs): #test('simon',18) args=('simon') kwargs={'age':18} # print(func) start_time=time.time() res=func(*args,**kwargs) #就是在運行test() func(*('simon'),**{'age':18}) stop_time=time.time() print('運行時間是%s' %(stop_time-start_time)) return res return wrapper @timmer #語法糖 def test(name,age): time.sleep(3) print('test函數運行完畢,名字是【%s】,年齡是【%s】' % (name,age)) return '這是test的返回值' def test1(name,age,gender): time.sleep(1) print('test函數運行完畢,名字是【%s】,年齡是【%s】,性別是【%s】' % (name,age,gender)) res=test('simon',18) #就是在運行wrapper print(res) test1('simon',18,'male')
5、裝飾器的使用
#無參裝飾器
import time
def timmer(func):
def wrapper(*args,**kwargs):
start_time=time.time()
res=func(*args,**kwargs)
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
return res
return wrapper
@timmer
def foo():
time.sleep(3)
print('from foo')
foo()
#有參裝飾器
def auth(driver='file'):
def auth2(func):
def wrapper(*args,**kwargs):
name=input("user: ")
pwd=input("pwd: ")
if driver == 'file':
if name == 'simon' and pwd == '123':
print('login successful')
res=func(*args,**kwargs)
return res
elif driver == 'ldap':
print('ldap')
return wrapper
return auth2
@auth(driver='file')
def foo(name):
print(name)
foo('simon')
#驗證功能裝飾器
#驗證功能裝飾器
user_list=[
{'name':'simon','passwd':'123'},
{'name':'zhurui','passwd':'123'},
{'name':'william','passwd':'123'},
{'name':'zhurui1','passwd':'123'},
]
current_dic={'username':None,'login':False}
def auth_func(func):
def wrapper(*args,**kwargs):
if current_dic['username'] and current_dic['login']:
res=func(*args,**kwargs)
return res
username=input('用戶名:').strip()
passwd=input('密碼:').strip()
for user_dic in user_list:
if username == user_dic['name'] and passwd == user_dic['passwd']:
current_dic['username']=username
current_dic['login']=True
res=func(*args,**kwargs)
return res
else:
print('用戶名或者密碼錯誤')
# if username == 'simon' and passwd == '123':
# user_dic['username']=username
# user_dic['login']=True
# res=func(*args,**kwargs)
# return res
# else:
# print('用戶名或密碼錯誤')
return wrapper
@auth_func
def index():
print('歡迎來到某寶首頁')
@auth_func
def home(name):
print('歡迎回家%s' %name)
@auth_func
def shopping_car(name):
print('%s購物車裡有[%s,%s,%s]' %(name,'餐具','沙發','電動車'))
print('before----->',current_dic)
index()
print('after---->',current_dic)
home('simon')
# shopping_car('simon')
#帶參數驗證功能裝飾器
#帶參數驗證功能裝飾器
user_list=[
{'name':'simon','passwd':'123'},
{'name':'zhurui','passwd':'123'},
{'name':'william','passwd':'123'},
{'name':'zhurui1','passwd':'123'},
]
current_dic={'username':None,'login':False}
def auth(auth_type='filedb'):
def auth_func(func):
def wrapper(*args,**kwargs):
print('認證類型是',auth_type)
if auth_type == 'filedb':
if current_dic['username'] and current_dic['login']:
res = func(*args, **kwargs)
return res
username=input('用戶名:').strip()
passwd=input('密碼:').strip()
for user_dic in user_list:
if username == user_dic['name'] and passwd == user_dic['passwd']:
current_dic['username']=username
current_dic['login']=True
res = func(*args, **kwargs)
return res
else:
print('用戶名或者密碼錯誤')
elif auth_type == 'ldap':
print('這玩意沒搞過,不知道怎麼玩')
res = func(*args, **kwargs)
return res
else:
print('鬼才知道你用的什麼認證方式')
res = func(*args, **kwargs)
return res
return wrapper
return auth_func
@auth(auth_type='filedb') #auth_func=auth(auth_type='filedb')-->@auth_func 附加了一個auth_type --->index=auth_func(index)
def index():
print('歡迎來到某寶主頁')
@auth(auth_type='ldap')
def home(name):
print('歡迎回家%s' %name)
#
@auth(auth_type='sssssss')
def shopping_car(name):
print('%s的購物車裡有[%s,%s,%s]' %(name,'奶茶','妹妹','娃娃'))
# print('before-->',current_dic)
# index()
# print('after--->',current_dic)
# home('simon')
shopping_car('simon')