最近讀了Redis官網一篇關於使用管道加速Redis查詢 的文章,原文: "Using pipelining to speedup Redis queries" ,中文翻譯可參考: "管道(Pipelining)" 一個請求/響應伺服器能處理新的請求即使客戶端還未讀取舊的響應。這樣就可以將多個命令發 ...
最近讀了Redis官網一篇關於使用管道加速Redis查詢 的文章,原文:Using pipelining to speedup Redis queries,中文翻譯可參考:管道(Pipelining)
一個請求/響應伺服器能處理新的請求即使客戶端還未讀取舊的響應。這樣就可以將多個命令發送到伺服器,而不用等待回覆,最後在一個步驟中讀取該答覆。
管道不僅僅是為了減少往返時間(Round Trip Time,簡稱RTT)引起的延遲成本,它實際上大大提高了給定 Redis 伺服器中每秒可執行的操作總量。 這是因為在不使用管道的情況下,從訪問數據結構和生成應答的角度來看,為每個命令提供服務非常廉價,但從執行套接字I/O的角度來看,這是非常昂貴的。 這涉及到調用 read() 和 write()系統調用,這意味著從用戶區到內核區。 上下文切換是一個巨大的速度損失。
原文給的代碼示例是基於ruby語言,下麵我分別展示Java語言和Go語言的Redis管道示例。
Redis管道Java示例
語言版本是:
- JDK:13.0.2
- Redis客戶端:Jedis 3.2.0
pom.xml(Maven包依賴)
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
JedisPipelineDemo.java
import com.google.common.base.Stopwatch;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class JedisPipelineDemo {
Jedis jedis = new Jedis("localhost");
void withoutPipeline() {
for (int i = 0; i < 10000; i++)
jedis.ping();
}
void withPipeline() {
Pipeline p = jedis.pipelined();
for (int i = 0; i < 10000; i++)
p.ping();
p.sync(); // 獲取所有的響應
}
void serialTask() {
printTime(() -> withoutPipeline(), "withoutPipeline"); // 346.4 ms
printTime(() -> withPipeline(), "withPipeline"); // 18.34 ms
}
// 雖然傳入的是Runnable對象,直接調用run方法並沒有創建線程,而是使用當前線程同步執行方法。
void printTime(Runnable task, String taskName) {
Stopwatch stopwatch = Stopwatch.createStarted();
task.run();
System.out.println(taskName + " took: " + stopwatch.stop());
}
public static void main(String[] args) {
JedisPipelineDemo demo = new JedisPipelineDemo();
demo.serialTask();
}
}
Redis管道Go示例
語言版本:
redis_pipeline.go
package main
import (
"github.com/gomodule/redigo/redis"
"log"
"time"
)
var (
c redis.Conn
err error
reply interface{}
)
func init() {
c, err = redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Fatal(err)
}
}
func main() {
defer c.Close()
withoutPipelining() // 256.53 ms
withPipelining() // 8.69 ms
}
func withoutPipelining() {
defer timeTrack(time.Now(), "withoutPipelining")
for i := 0; i < 10000; i++ {
c.Do("PING")
}
}
func withPipelining() {
defer timeTrack(time.Now(), "withPipelining")
c.Send("MULTI")
for i := 0; i < 10000; i++ {
c.Send("PING")
}
c.Do("EXEC")
}
func timeTrack(start time.Time, name string) {
elapsed := time.Since(start)
log.Printf("%s took %s", name, elapsed)
}