theme: condensed-night-purple highlight: androidstudio 主從複製原理 建立連接 從節點在配置了 replicaof 配置了主節點的ip和port 從庫執行replicaof 併發送psync命令 同步數據到從庫 主庫bgsave生成RDB文件,並 ...
主從複製原理
-
建立連接
- 從節點在配置了 replicaof 配置了主節點的ip和port
- 從庫執行replicaof 併發送psync命令
-
同步數據到從庫
- 主庫bgsave生成RDB文件,併發送給從庫,同時為每一個slave開闢一塊 replication buffer 緩衝區記錄從生成rdb文件開始收到的所有寫命令。
- 從庫清空數據並載入rdb
-
發送新寫的命令給從庫
- 從節點載入 RDB 完成後,主節點將 replication buffer 緩衝區的數據(增量寫命令)發送到從節點,slave 接收並執行,從節點同步至主節點相同的狀態。
-
基於長連接傳播
- 方便後續命令傳輸,除了寫命令,還維持著心跳機制
網路模型
-
阻塞IO
-
非阻塞IO
- 沒啥用,還是需要等待數據準備就緒
-
IO多路復用
-
信號驅動IO
-
非同步IO
IO多路復用
文件描述符:簡稱FD
select
流程:
-
創建fd_set rfds
- 假設要監聽的fd為1,2,5
-
執行select(5+1,null,null,3)
- 第一個參數為最大長度+1,後面是等待時間
-
內核遍歷fd
-
沒有數據,休眠
- 等待數據就緒或者超時
-
-
遍歷fd_set 找到就緒的數據
存在的問題
- 需要將整個fd_set從用戶空間拷貝到內核空間,select結束後還要拷貝回用戶空間
- 需要遍歷一次
- 最大為監聽1024
poll
將數據改為了鏈表,還是需要遍歷
epoll
- epoll_create 創建epoll實例
- epoll_ctl 添加需要監聽的fd,關聯callback
- epoll_wait 等待fd就緒
epoll_wait有兩種通知模式
- levelTriggered 簡稱LT 當FD有數據可讀的時候,會重覆通知多次,直到數據處理完成。是epoll的預設模式
- EdgeTriggered 簡稱ET 。當FD有數據可讀的時候只會被通知一次,不管數據是否處理完成
ET模式避免了LT的驚群效應。
ET模式最好結合非阻塞IO。
淘汰策略
分類:全體,ttl
LRU
抽樣LRU 不是嚴格的
LFU
RedisObject中使用邏輯訪問次數
訪問頻率越高,增加的越小,還會衰減(16位記錄時間,8位記錄邏輯訪問次數)
總結
附帶的tcp連接redis客戶端 使用go編寫的
package main
import (
"bufio"
"errors"
"fmt"
"io"
"net"
"strconv"
)
// RedisClient 封裝用於連接
type RedisClient struct {
conn net.Conn
//包裝一層 方便讀寫
writer *bufio.Writer
reader *bufio.Reader
}
func main() {
//連接redis
conn, err := net.Dial("tcp", "127.0.0.1:6379")
if err != nil {
fmt.Println("連接redis失敗", err)
}
client := RedisClient{conn: conn, writer: bufio.NewWriter(conn), reader: bufio.NewReader(conn)}
defer client.conn.Close()
//發送命令
client.sendRequest([]string{"set", "name", "方塊"})
//zrange boards:2024-4 0 -1
//client.sendRequest([]string{"zrange", "boards:2024-4", "0", "-1"})
//LRANGE tlist 0 -1
client.sendRequest([]string{"LRANGE", "tlist", "0", "-1"})
}
func (client *RedisClient) sendRequest(args []string) interface{} {
length := len(args)
firstCommand := fmt.Sprintf("%s%d", "*", length)
client.writeCommand(firstCommand)
for _, s := range args {
n := len(s)
client.writeCommand("$" + strconv.Itoa(n))
client.writeCommand(s)
println(n, s)
}
response := client.handleResponse()
//fmt.Printf("%v", response)
return response
}
// 寫命令
func (client *RedisClient) writeCommand(s string) {
client.conn.Write([]byte(s + "\r\n"))
}
// 解析返回結果
func (client *RedisClient) handleResponse() interface{} {
//先讀取第一個字元
r, _, _ := client.reader.ReadRune()
flag := string(r)
fmt.Println("第一個操作數:" + flag)
switch flag {
case "+":
//一行字元串
return client.ReadLine()
case "-":
//異常
return client.ReadLine()
case ":":
//數字
line := client.ReadLine()
res, _ := strconv.Atoi(line)
return res
case "$":
// 多行字元串
//readRune, _, _ := client.reader.ReadRune()
//去掉換行
readRune := client.ReadLine()
length := string(readRune)
if length == "-1" {
return nil
} else if length == "0" {
return ""
}
lll, _ := strconv.Atoi(length)
//+2是跳過\r\n
bytes := make([]byte, lll+2)
n, _ := client.reader.Read(bytes)
return string(bytes[:n])
case "*":
//多行字元串 遞歸獲取
return client.readBulkString()
default:
return errors.New("錯誤")
}
}
// 讀一行
func (client *RedisClient) ReadLine() string {
bytes, _, _ := client.reader.ReadLine()
return string(bytes)
}
// 讀到末尾 估計有點問題
func (client *RedisClient) ReadToEnd() string {
var size = 1024
bytes := make([]byte, size)
var temp = ""
for {
n, err := client.reader.Read(bytes)
temp += string(bytes[:n])
//n, err := client.conn.Read(bytes)
if err == io.EOF || n == 0 || n < size {
break
}
}
return temp
}
func (client *RedisClient) readBulkString() interface{} {
counts, _ := strconv.Atoi(client.ReadLine())
if counts <= 0 {
return nil
}
//var resList = list.List{}
var lists []interface{}
for i := 0; i < counts; i++ {
res := client.handleResponse()
lists = append(lists, res)
//fmt.Println("多行結果:" + fmt.Sprintf("%v", res))
}
return lists
}