這裡的Redis主從結構可以是簡單的主從,sentinel,redis cluster中的主從等。wait命令的作用:此命令將阻塞當前客戶端,直到當前Session連接(主節點上)所有的寫命令都被傳送到指定數據量的slave節點。如果到達超時(以毫秒為單位),則即使尚未完全傳送到達指定數量的salv ...
這裡的Redis主從結構可以是簡單的主從,sentinel,redis cluster中的主從等。
wait命令的作用:
此命令將阻塞當前客戶端,直到當前Session連接(主節點上)所有的寫命令都被傳送到指定數據量的slave節點。
如果到達超時(以毫秒為單位),則即使尚未完全傳送到達指定數量的salve節點,該命令也會返回(成功傳送到的節點的個數)。
該命令將始終返回確認在WAIT命令之前發送的寫命令的副本數量,無論是在達到指定數量的副本的情況下,還是在達到超時的情況下。
具體說就是:比如對於1主2從的結構,Wait要求3秒鐘之內傳送到2個節點,但是達到超時時間3秒鐘之後只成功傳送到了1個slave節點上,此時wait也不會繼續阻塞,而是返回成功傳送的節點個數(1)。
有點類似於MySQL的半同步複製,但是效果完全不能跟半同步相比,因為Redis本身沒有回滾的功能,這裡的wait命令發起之後,即便是超時時間之後沒有送到任何一個slave節點,主節點也不會回滾。
wait命令無法保證Redis主從之間的強一致,不過,在主從、sentinel和Redis群集故障轉移中,wait能夠增強(僅僅是增強,但不是保證)數據的安全性。
既然wait命令在當前連接之後會等待指定數量的從節點確認,其主節點的寫入效率必然會收到一定程度的影響,那麼這個影響有多大?
這裡做一個簡單的測試,環境2核4G的宿主機,docker下的集群3主3從的Redis集群,因此不用考慮網路延遲,在執行寫入操作之後,使用兩個Case,對比使不使用wait命令等待傳送到salve的效率,
1,單線程迴圈寫入100000個key值
2,多線程併發,10個線程每個線程寫入10000個key,一共寫入100000個key
Case1:單線程迴圈寫入100000個key值
結論:不使用wait命令,整體耗時33秒,集群中單個節點的TPS為1000左右;使用wait命令,整體耗時72秒,集群中單個節點的TPS為480左右,整體效率下降了50%多一點
單線程不使用WAIT
單線程使用WAIT(redis_conn.execute_command('wait', 1, 0))
Case2:多線程迴圈寫入100000個key值
結論:不使用wait命令,整體耗時19秒,集群中單個節點的TPS為1700左右;使用wait命令,整體耗時36秒,集群中單個節點的TPS為900左右,整體效率與單線程基本上一致,下降了50%多一點
多線程不使用WAIT,單節點上TPS可達到1700左右
多線程使用WAIT,單節點上TPS可達到850左右
鑒於在多線程模式下,CPU負載接近於瓶頸,因此不能再加更多的線程數,測試數據也僅供參考。
總結:
wait能夠在主節點寫入命令之後,通過阻塞的方式等待數據傳送到從節點,wait能夠增強(但不保證)數據的安全性。
其代價或者說性能損耗也是不小的,通過以上測試可以看出,即便是不考慮網路傳輸延遲的情況下,其性能損耗也超出了50%。
#!/usr/bin/env python # coding:utf-8 import sys import time import datetime from rediscluster import StrictRedisCluster import threading from time import ctime,sleep def redis_cluster_write(): redis_nodes = [ {'host':'172.18.0.11','port':8888}, {'host':'172.18.0.12','port':8888}, {'host':'172.18.0.13','port':8888}, {'host':'172.18.0.14','port':8888}, {'host':'172.18.0.15','port':8888}, {'host':'172.18.0.16','port':8888}] try: redis_conn = StrictRedisCluster(startup_nodes=redis_nodes,password='******') except Exception: raise Exception redis_conn.config_set('cluster-require-full-coverage', 'yes') counter = 0 for i in range(0,100000): counter = counter+1 redis_conn.set('key_'+str(i),'value_'+str(i)) #redis_conn.execute_command('wait', 1, 0) if counter == 1000: print('insert 1000 keys '+str(str(datetime.datetime.now()))) counter = 0 def redis_concurrence_test(thread_id): redis_nodes = [ {'host':'172.18.0.11','port':8888}, {'host':'172.18.0.12','port':8888}, {'host':'172.18.0.13','port':8888}, {'host':'172.18.0.14','port':8888}, {'host':'172.18.0.15','port':8888}, {'host':'172.18.0.16','port':8888}] try: redis_conn = StrictRedisCluster(startup_nodes=redis_nodes, password='******') except Exception: raise Exception redis_conn.config_set('cluster-require-full-coverage', 'yes') counter = 0 for i in range(0, 10000): counter = counter + 1 redis_conn.set('key_' + str(thread_id)+'_'+str(counter), 'value_' + str(i)) #redis_conn.execute_command('wait', 1, 0) if counter == 1000: print(str(thread_id)+':insert 1000 keys ' + str(str(datetime.datetime.now()))) counter = 0 if __name__ == '__main__': #redis_cluster_write() threads = [] for i in range(10): t = threading.Thread(target=redis_concurrence_test, args=(i,)) threads.append(t) begin_time = ctime() for t in threads: t.setDaemon(True) t.start() for t in threads: t.join()
https://redis.io/commands/wait