初探富文本之CRDT協同實例 在前邊初探富文本之CRDT協同演算法一文中我們探討了為什麼需要協同、分散式的最終一致性理論、偏序集與半格的概念、為什麼需要有偏序關係、如何通過數據結構避免衝突、分散式系統如何進行同步調度等等,這些屬於完成協同所需要瞭解的基礎知識,實際上當前有很多成熟的協同實現,例如aut ...
初探富文本之CRDT協同實例
在前邊初探富文本之CRDT
協同演算法一文中我們探討了為什麼需要協同、分散式的最終一致性理論、偏序集與半格的概念、為什麼需要有偏序關係、如何通過數據結構避免衝突、分散式系統如何進行同步調度等等,這些屬於完成協同所需要瞭解的基礎知識,實際上當前有很多成熟的協同實現,例如automerge
、yjs
等等,本文就是關註於以yjs
為CRDT
協同框架來實現協同的實例。
描述
接入協同框架實際上並不是一件簡單的事情,當然相對於接入OT
協同而言接入CRDT
協同已經是比較簡單的了,因為我們只需要聚焦於數據結構的使用就好,而不需要對變換有過多的關註。當前我們更加關註的是Op-based CRDT
,本文所說的CRDT
也是特指的Op-based CRDT
,畢竟State-baed CRDT
需要將全量數據進行傳輸,每次都要完整傳輸狀態來完成同步讓它比較難變成通用的解決方案。因此與OT
演算法一樣,我們依然需要Operation
,在富文本領域,最經典的Operation
有quill
的delta
模型,通過retain
、insert
、delete
三個操作完成整篇文檔的描述與操作,還有slate
的JSON
模型,通過insert_text
、split_node
、remove_text
等等操作來完成整篇文檔的描述與操作。假如此時是OT
的話,接下來我們就要聊到變換Transformation
了,但是使用CRDT
演算法的情況下,我們的關註點變了,我們需要做的是關註於如何將我們現在的數據結構轉換為CRDT
框架的數據結構,比如通過框架提供的Array
、Map
、Text
等類型構建我們自己的JSON
數據,並且我們的Op
也需要映射到對框架提供的數據結構進行的操作,這樣框架便可以幫我們進行協同,當框架完成協同之後把框架的數據結構的改變返回,此時我們需要再將這部分改變映射到我們自己的Op
,然後我們只需要在本地應用這些遠程同步併在本地轉換的Op
,就可以做到協同了。
上邊這個數據轉換聽起來是不是有點耳熟,在前邊初探富文本之OT
協同實例中,我們介紹了json0
,我們也提到了一個可行的操作,我們讓變換Transformation
這部分讓json0
去做,我們需要關註的是從我們自己定義的數據結構轉換到json0
,在json0
進行變換操作之後我們同樣地將Op
轉換後應用到我們本地的數據就好。雖然原理是完全不同的,但是我們在已有成熟框架的情況下似乎並不需要關註這點,我們更側重於使用,實際上在使用起來是很像的。此時假設我們有一個自研的思維導圖功能需要實現協同,而保存的數據結構都是自定義的,沒有直接可以調用的實現方案,我們就需要進行轉換適配,那麼如果使用OT
的話,並且藉助json0
做變換,那麼我們需要做的是把Op
轉換為json0
的Op
,發送的數據也將會是這個json0
的Op
,那麼如果直接使用CRDT
的話,我們更像是通過框架定義的數據結構將Op
應用到數據結構上,發送的數據是框架定義的數據,類似於將Op
應用到數據結構上了,其他的操作都由框架給予完整的支持了。實際上通過框架提供的例子後,接入CRDT
協同就主要是理解並且實現的問題了,這樣就有一個大體的實現方向了,而不是毫無頭緒不知道應該從哪裡開始做協同。另外還是那個宗旨,合適的才是最好的,要考慮到實現的成本問題,沒有必要硬套數據結構的實現,OT
有OT
的優點,CRDT
有CRDT
的優點,CRDT
這類方法相比OT
還比較年輕,還是在不斷發展過程中的,實際上有些問題例如記憶體占用、速度等問題最近幾年才被比較好的解決,ShareDB
作者在關註CRDT
不斷發展的過程中也說了CRDTs are the future
。此外從技術上講,CRDT
類型是OT
類型的子集,也就是說,CRDT
實際上是不需要轉換函數的OT
類型,因此任何可以處理這些OT
類型的東西也應該能夠使用CRDT
。
或許上邊的一些概念可能一時間讓人難以理解,所以下麵的Counter
與Quill
兩個實例就是介紹瞭如何使用yjs
實現協同,究竟如何通過數據結構完成協同的接入工作,當然具體的API
調用還是還是需要看yjs
的文檔,本文只涉及到最基本的協同操作,所有的代碼都在https://github.com/WindrunnerMax/Collab
中,註意這是個pnpm
的workspace monorepo
項目,要註意使用pnpm
安裝依賴。
Counter
首先我們運行一個基礎的協同實例Counter
,實現的主要功能是在多個客戶端可以+1
的情況下我們可以維護同一份計數器總數,該實例的地址是https://github.com/WindrunnerMax/Collab/tree/master/packages/crdt-counter
,首先簡單看一下目錄結構(tree --dirsfirst -I node_modules
):
crdt-counter
├── public
│ ├── favicon.ico
│ └── index.html
├── server
│ └── index.ts
├── src
│ ├── client.ts
│ ├── counter.tsx
│ └── index.tsx
├── babel.config.js
├── package.json
├── rollup.config.js
├── rollup.server.js
└── tsconfig.json
先簡略說明下各個文件夾和文件的作用,public
存儲了靜態資源文件,在客戶端打包時將會把內容移動到build
文件夾,server
文件夾中存儲了CRDT
服務端的實現,在運行時同樣會編譯為js
文件放置於build
文件夾下,src
文件夾是客戶端的代碼,主要是視圖與CRDT
客戶端的實現,babel.config.js
是babel
的配置信息,rollup.config.js
是打包客戶端的配置文件,rollup.server.js
是打包服務端的配置文件,package.json
與tsconfig.json
大家都懂,就不贅述了。
在前邊CRDT
協同演算法實現一文中,我們經常提到的就是無需中央伺服器的分散式協同,那麼在這個例子中我們就來實現一個peer-to-peer
的實例。yjs
提供了一個y-webrtc
的信令伺服器,甚至還有公共的信令伺服器可以用,當然可能因為網路的關係這個公共的信令伺服器在國內不是很適用。在繼續完成協同之前,我們還需要瞭解一下WebRTC
以及信令的相關概念。
WebRTC
是一種實時通信技術,重點在於可以點對點即P2P
通信,其允許瀏覽器和應用程式直接在互聯網上傳輸音頻、視頻和數據流,無需使用中間伺服器進行中轉。WebRTC
利用瀏覽器內置的標準API
和協議來提供這些功能,並且支持多種編解碼器和平臺,WebRTC
可以用於開發各種實時通信應用,例如線上會議、遠程協作、實時廣播、線上游戲和IoT
應用等。但是在多級NAT
網路環境下,P2P
連接可能會受到限制,簡單來說就是一臺設備無法直接發現另一臺設備,自然也就沒有辦法進行P2P
通信,這時需要使用特殊的技術來繞過NAT
並建立P2P
連接。
NAT Network Address Translation
網路地址轉換是一種在IP
網路中廣泛使用的技術,主要是將一個IP
地址轉換為另一個IP
地址,具體來說其工作原理是將一個私有IP
地址(如在家庭網路或企業內部網路中使用的地址)映射到一個公共IP
地址(如互聯網上的IP
地址)。當一個設備從私有網路向公共網路發送數據包時,NAT
設備會將源IP
地址從私有地址轉換為公共地址,並且在返回數據包時將目標IP
地址從公共地址轉換為私有地址。NAT
可以通過多種方式實現,例如靜態NAT
、動態NAT
和埠地址轉換PAT
等,靜態NAT
將一個私有IP
地址映射到一個公共IP
地址,而動態NAT
則動態地為每個私有地址分配一個公共地址,PAT
是一種特殊的動態NAT
,在將私有IP
地址轉換為公共IP
地址時,還會將源埠號或目標埠號轉換為不同的埠號,以支持多個設備使用同一個公共IP
地址。NAT
最初是為瞭解決IPv4
地址空間的短缺而設計的,後來也為提高網路安全性並簡化網路管理提供了基礎。
在互聯網上大多數設備都是通過路由器或防火牆連接到網路的,這些設備通常使用網路地址轉換NAT
將內部IP
地址映射到一個公共的IP
地址上,這個公共IP
地址可以被其他設備用來訪問,但是這些設備內部的IP
地址是隱藏的,其他的設備不能直接通過它們的內部IP
地址建立P2P
連接。因此,直接進行P2P
連接可能會受到網路地址轉換NAT
的限制,導致連接無法建立。為瞭解決這個問題,需要使用一些技術來繞過NAT
並建立P2P
連接。另外,P2P
連接也需要一些控制和協調機制,以確保連接的可靠性和安全性。
信令可以用來解決多級NAT
環境下的P2P
連接問題,當兩個設備嘗試建立P2P
連接時,可以使用信令伺服器來交換網路信息,例如IP
地址、埠和協議類型等,以便設備之間可以彼此發現並建立連接。當然信令伺服器並不是繞過NAT
的唯一解決方案,STUN
、TURN
和ICE
等技術也可以幫助解決這個問題。信令伺服器的主要作用是協調不同設備之間的連接,以確保設備可以正確地發現和通信。在實際應用中,通常需要同時使用多種技術和工具來解決多級NAT
環境下的P2P
連接問題。
那麼回到WebRTC
,我們即使是使用了P2P
的技術,但是不可避免的需要一個信令伺服器來交換WebRTC
會話描述和控制信息。當然這些信息不包括實際通信的數據流本身,而是用於描述和控制這些流的方式和參數,這些數據流本身是通過對等連接在兩個瀏覽器之間直接傳輸的。主要數據流的通信不經過中央伺服器,這就使得WebRTC
有著低延遲和高帶寬等優點,但是同樣的因為每個對等點相互連接,不適合單個文檔上的大量協作者。
接下來我們要進行數據結構的設計,目前在yjs
中是沒有Y.Number
這個數據結構的,也就是說yjs
沒有自增自減的操作,這點就與前邊OT
實例不一樣了,所以在這裡我們需要設計數據結構。網路是不可靠的,我們不能夠在本地模擬+1
的操作,就是說本地先取得值,然後進行+1
操作之後再把值推到其他的客戶端上,這樣的設計雖然在本地測試應該是可行的,但是由於網路不可靠,我們不能保證本地取值的時候獲得的是最新的值,所以這個方案是不可靠的。
那麼我們思考幾種方案來實現這一點,有一種可行的方案是類似於我們之前介紹的CRDT
數據結構,我們可以構造一個集合Y.Array
,當我們點+1
的時候,就向集合中push
一個新的值,這樣再取和的時候直接取集合長度即可。
Y.Array: [] => +1 => [1] => +1 => [1, 1] => ...
Counter: [1, 1].size = N
另一種方案是使用Y.Map
來完成,當用戶加入我們的P2P
組的時候,我們通過其身份信息為其分配一個id
,然後這個id
只記錄與自增自己的值,也就是說當某個客戶端點擊+1
的時候,操作的只有其id
對應的數,而不能影響組網內其他的用戶的值。
Y.Map: {} => +1 => {"id": 1} => +1 => {"id": 2} => ...
Counter: Object.values({"id": 2}).reduce((a, b) => a + b) = N
在這裡我們使用的是Y.Map
的方案,畢竟如果是Y.Array
的話占用資源會是比較大的,當然因為實例中並沒有身份信息,每次進入的時候都是會隨機分配id
的,當然這不會影響到我們的Counter
。此外還有比較重要的一點是,因為我們是直接進行P2P
通信的,當所有的設備都離線的時候,由於沒有設計實際的數據存儲機制,所以數據會丟失,這點也是需要註意的。
接下來我們看看代碼的實現,首先我們來看看服務端,這裡主要實現是調用了一下y-webrtc-signaling
來啟動一個信令伺服器,這是y-webrtc
給予的開箱即用的功能,也可以基於這些內容進行改寫,不過因為是信令伺服器,除非有著很高的穩定性、定製化等要求,否則直接當作開箱即用的信令伺服器就好。後邊主要是使用了express
啟動了一個靜態資源伺服器,因為直接在瀏覽器打開文件的file
協議有很多的安全限制,所以需要一個HTTP Server
。
import { exec } from "child_process";
import express from "express";
// https://github.com/yjs/y-webrtc/blob/master/bin/server.js
exec("PORT=3001 npx y-webrtc-signaling", (err, stdout, stderr) => { // 調用`y-webrtc-signaling`
console.log(stdout, stderr);
});
const app = express(); // 實例化`express`
app.use(express.static("build")); // 客戶端打包過後的靜態資源路徑
app.listen(3000);
console.log("Listening on http://localhost:3000");
在客戶端方面主要是定義了一個定義了一個共用的鏈接,通過id
來加入我們的P2P
組,並且還有密碼的保護,這裡需要鏈接的信令伺服器也就是上邊啟動的y-webrtc
的3001
埠的信令服務。之後我們通過observe
定義的Y.Map
數據結構的變化來執行回調,在這裡實際上就是將回調過後的整個Map
數據傳回回調函數,然後在視圖層進行Counter
的計算,這裡還有一個transaction.origin
判斷是為了防止我們本地的調用觸發回調。最後我們定義了一個increase
函數,在這裡我們通過transact
作為事務來執行set
操作,因為我們之前的設計只會處理我們當前客戶端對應的id
的那個值,本地的值是可信的,直接自增即可,transact
最後一個參數也就是上邊提到了的transaction.origin
,可以用來判斷事件的來源。
import { Doc, Map as YMap } from "yjs";
import { WebrtcProvider } from "y-webrtc";
const getRandomId = () => Math.floor(Math.random() * 10000).toString();
export type ClientCallback = (record: Record<string, number>) => void;
class Connection {
private doc: Doc;
private map: YMap<number>;
public id: string = getRandomId(); // 當前客戶端生成的唯一`id`
public counter = 0; // 當前客戶端的初始值
constructor() {
const doc = new Doc();
new WebrtcProvider("crdt-example", doc, { // `P2P`組名稱 // `Y.Doc`實例
password: "room-password", // `P2P`組密碼
signaling: ["ws://localhost:3001"], // 信令伺服器
});
const yMapDoc = doc.getMap<number>("counter"); // 獲取數據結構
this.doc = doc;
this.map = yMapDoc;
}
bind(cb: ClientCallback) {
this.map.observe(event => { // 監聽數據結構變化 // 如果是多層嵌套需要`observeDeep`
if (event.transaction.origin !== this) { // 防止本地修改時觸發
const record = [...this.map.entries()].reduce( // 獲取`Y.Map`定義中的所有數據
(cur, [key, value]) => ({ ...cur, [key]: value }),
{} as Record<string, number>
);
cb(record); // 執行回調
}
});
}
public increase() {
this.doc.transact(() => { // 事務
this.map.set(this.id, ++this.counter); // 自增本地`id`對應的值
}, this); // 來源
}
}
export default new Connection();
Quill
在運行富文本的實例Quill
之前,我們不妨先來簡單討論一下是如何在富文本上應用的CRDT
,在前文CRDT
協同演算法中主要討論的是分散式與CRDT
的原理,並沒有涉及具體的富文本該如何設計數據結構,那麼在這裡我們簡單討論下yjs
在富文本上應用CRDT
的設計。看之前描述那一節的時候我們可能會產生一些有趣的想法,或許我們可以這麼來做,可以通過底層來實現OT
,之後在上層封裝一層數據結構供外部使用的方式,從而對外看起來像是CRDT
。當然原理上是不會這麼做的,因為這樣失去了擁抱CRDT
的意義,可能會有部分借鑒實現的思路,但是不會直接這麼做的。
首先我們可以回憶一下CRDT
在集合這個數據結構上的設計,我們主要考慮到了集合的添加和刪除如何完整的保證交換律、結合律、冪等律,那麼現在在富文本的實現上,我們不僅需要考慮到插入和刪除,需要考慮到順序的問題,並且我們還需要保證CCI
,即最終一致性、因果一致性、意圖一致性,當然還需要考慮到Undo/Redo
、游標同步等相關的問題。
那麼我們首先來看看如何保證插入數據的順序,對於OT
而言是通過索引得知用戶要操作的位置,並且通過變換來確保最終一致性,那麼CRDT
是不需要這麼做的,上邊也提到過完全靠OT
的話可能就失去了擁抱CRDT
的意義,那麼如何確保要插入的位置正確呢,CRDT
不靠索引的話就需要靠數據結構來完成這點,我們可以通過相對位置來完成,例如我們目前有AB
字元串,此時在中間插入了C
字元,那麼這個字元就需要被標記為在A
之後,在B
之前,那麼很顯然,我們需要為每個字元都分配唯一的id
,否則我們是無法做到這一點的,當然這塊實際上還有優化空間,在這裡就先不談這點,那麼由此我們通過相對位置保證了插入的順序。
接下來我們再看看刪除的問題,在前文的Observed-Remove Set
集合數據結構中我們是可以真正的進行刪除操作的,而在這裡由於我們是通過相對位置來實現完整的順序,所以實際上我們是不能夠真正地將我們標記的Item
進行刪除的,Item
可以理解為插入的字元,也就是所謂的軟刪除。舉個例子,目前我們有AB
字元串,其中一個客戶端刪除了B
,另一個客戶端同時在A
與B
之間增加了C
,那麼此時這兩個Op
同步到了第三個客戶端,那麼假如增加了C
這個操作先到並且執行了,再刪除了B
,那麼沒有問題,可是假設我們先刪除了B
,再增加了C
,那麼這個C
我們就不能夠找到他要插入的位置,因為B
已經被刪除了,我們是要在A
與B
之間去插入C
的,那麼這樣這個操作就無法執行下去了,由此這樣其實就導致了操作不滿足交換律,那麼這就不能真的作為CRDT
的數據結構設計了。其實我們可能會想,為什麼需要兩個位置來保證插入的字元位置,完全可以用B
的左側或者A
的右側來完成,實際上思考一下這是同樣的問題,多個客戶端來操作的話假如一個刪除了A
另一個刪除了B
,那麼便無論如何也找不到插入的位置了,這是不滿足交換律和結合律的操作,就不能作為CRDT
的實現了。因此為了衝突的解決yjs
並沒有真正的刪除Item
,而是採用了標記的形式,即刪除的Item
會被加入一個deleted
標記,那麼不刪除會造成一個明顯的問題,空間的占用會無限增長,因此yjs
引入了墓碑機制,當確認了內容不會再被干涉之後,將對象的內容替換為空的墓碑對象。
上邊也提到了衝突的問題,很明顯在設計上是存在衝突的問題的,因為CRDT
實際上並不是完全為了協同編輯的場景而專門設計的,其主要是為瞭解決分散式場景中的一致性問題,所以在應用到協同編輯的場景中,不可避免地會出現衝突的問題,實際上這個衝突主要是為了集合順序的引入而導致的,要是不關心順序,那麼自然就不會出現衝突問題了。那麼為了使數據能夠滿足三律,在前文我們引入了一個偏序的概念,但是在協同編輯設計中,使用偏序不能夠保證數據同步的正確性和一致性,因為其無法處理一些關鍵的衝突情況,舉一個簡單的例子,假設我們此時有AB
字元串,如果一個客戶端在AB
中加入了C
,另一個加入了D
,那麼究竟誰在前呢,所以我們需要引入全序的方法,即任意兩個Item
都是可以比較的。那麼很明顯的,如果我們為每個Item
附加上時間戳的元信息,便可以引入全序了,但是實際上由於不同的客戶端可能具有不同的時鐘偏差,網路延遲和時鐘不同步等問題也可能導致時間戳不可靠。那麼相比之下,邏輯時鐘或者邏輯時間戳可以使用更簡單和可靠的方式來維護事件的順序:
- 每次發生本地事件時,
clock = clocl + 1
。 - 每次接收到遠程事件時,
clock = max(clock, remoteClock) + 1
。
看起來依舊會有發生衝突的可能,那麼我們可以再引入一個客戶端的唯一id
,也就是clientID
。這種機制看似簡單,但實際上使我們獲得了數學上性質良好的全序結構,這意味著我們可以在任意兩個Item
之間對比獲得邏輯上的先後關係,這對保證CRDT
演算法的正確性相當重要。此外,通過這種方式我們也可以保證因果一致性,假如此時我們有兩個操作a
、b
如果有因果關係,那麼a.clock
一定大於b.clock
,這樣的得到的順序一定是滿足因果關係的,當然如果沒有因果關係,就可以取任意的順序執行了。舉個例子,我們有三個客戶端A
、B
、C
以及字元串SE
,A
在SE
中間添加了a
字元,此時這個操作同步到了B
,B
將a
字元給刪除了,假設此時C
先收到了B
的刪除操作,因為這個操作依賴於A
的操作,需要進行因果依賴關係的檢查,這個操作的邏輯時鐘和位移大於C
本地文檔中已經應用的操作的邏輯時鐘和位移,需要等待先前的操作被應用後再應用這個操作,當然這並不是在yjs
中的實現,因為yjs
不會存在真正的刪除操作,並且在刪除操作的時候實際上並不會導致時鐘的增加,只是增加一個標記,上邊這個例子其實可以換個說法,兩個相同的插入操作,因為我們是相對位置,所以後一個插入操作是依賴前一個插入操作的,因此就需要因果檢查,其實這也是件有意思的事情,當收到在同一個位置編輯的不同客戶端操作時候,如果時鐘相同就是衝突操作,不相同就是因果關係。
那麼由此我們通過CRDT
數據結構與演算法設計解決了最終一致性和因果一致性,對於意圖一致性的問題,當不存在衝突的時候我們是能夠保證意圖的,即插入文檔的Item
的順序,在衝突的時候我們實際上會比較clientID
決定究竟誰在前在後,其實實際上無論誰在前還是在後都可以認為是一種烏龍,我們在衝突的時候只保證最終一致性,對於意圖一致性則需要做額外的設計才可以實現,在這裡就不做過多探討了。實際上yjs
還有大量的設計與優化操作,以及基於YATA
的衝突解決演算法等,比如通過雙向鏈表來保存文檔結構順序,通過Map
為每個客戶端保存的扁平的 Item
數組,優化本地插入的速度而設計的緩存機制(鏈表的查找O(N)
與跟隨游標的位置緩存),傾向於State-based
的刪除,Undo/Redo
,游標同步,壓縮數據網路傳輸等等,還是很值得研究的。
我們再回到富文本的實例Quill
中,實現的主要功能是在quill
富文本編輯器中接入協同,並支持編輯游標的同步,該實例的地址是https://github.com/WindrunnerMax/Collab/tree/master/packages/crdt-quill
,首先簡單看一下目錄結構(tree --dirsfirst -I node_modules
):
crdt-quill
├── public
│ └── favicon.ico
├── server
│ └── index.ts
├── src
│ ├── client.ts
│ ├── index.css
│ ├── index.ts
│ └── quill.ts
├── package.json
├── rollup.config.js
├── rollup.server.js
└── tsconfig.json
依舊簡略說明下各個文件夾和文件的作用,public
存儲了靜態資源文件,在客戶端打包時將會把內容移動到build
文件夾,server
文件夾中存儲了CRDT
服務端的實現,在運行時同樣會編譯為js
文件放置於build
文件夾下,src
文件夾是客戶端的代碼,主要是視圖與CRDT
客戶端的實現,rollup.config.js
是打包客戶端的配置文件,rollup.server.js
是打包服務端的配置文件,package.json
與tsconfig.json
大家都懂,就不贅述了。
quill
的數據結構並不是JSON
而是Delta
,Delta
是通過retain
、insert
、delete
三個操作完成整篇文檔的描述與操作,我們試想一下描述一段字元串的操作需要什麼,是不是通過這三種操作就能夠完全覆蓋了,所以通過Delta
來描述文本增刪改是完全可行的,而且12
年quill
的開源可以說是富文本發展的一個裡程碑,於是yjs
是直接原生支持Delta
數據結構的。
接下來我們看看來看看服務端,這裡主要實現是調用了一下y-websocket
來啟動一個websocket
伺服器,這是y-websocket
給予的開箱即用的功能,也可以基於這些內容進行改寫,yjs
還提供了y-mongodb-provider
等服務端服務可以使用。後邊主要是使用了express
啟動了一個靜態資源伺服器,因為直接在瀏覽器打開文件的file
協議有很多的安全限制,所以需要一個HTTP Server
。
import { exec } from "child_process";
import express from "express";
// https://github.com/yjs/y-websocket/blob/master/bin/server.js
exec("PORT=3001 npx y-websocket", (err, stdout, stderr) => { // 調用`y-websocket`
console.log(stdout, stderr);
});
const app = express(); // 實例化`express`
app.use(express.static("build")); // 客戶端打包過後的靜態資源路徑
app.use(express.static("node_modules/quill/dist")); // `quill`靜態資源路徑
app.listen(3000);
console.log("Listening on http://localhost:3000");
在客戶端方面主要是定義了一個定義了一個共用的鏈接,通過crdt-quill
作為RoomName
進入組,這裡需要鏈接的websocket
伺服器也就是上邊啟動的y-websocket
的3001
埠的服務。之後我們定義了頂層的數據結構為YText
數據結構的變化來執行回調,並且將一些信息暴露了出去,doc
就是這需要使用的yjs
實例,type
是我們定義的頂層數據結構,awareness
意為感知,只要是用來完成實時數據同步,在這裡是用來同步游標選區。
import { Doc, Text as YText } from "yjs";
import { WebsocketProvider } from "y-websocket";
class Connection {
public doc: Doc; // `yjs`實例
public type: YText; // 頂層數據結構
private connection: WebsocketProvider; // `WebSocket`鏈接
public awareness: WebsocketProvider["awareness"]; // 數據實時同步
constructor() {
const doc = new Doc(); // 實例化
const provider = new WebsocketProvider("ws://localhost:3001", "crdt-quill", doc); // 鏈接`WebSocket`伺服器
provider.on("status", (e: { status: string }) => {
console.log("WebSocket", e.status); // 鏈接狀態
});
this.doc = doc; // `yjs`實例
this.type = doc.getText("quill"); // 獲取頂層數據結構
this.connection = provider; // 鏈接
this.awareness = provider.awareness; // 數據實時同步
}
reconnect() {
this.connection.connect(); // 重連
}
disconnect() {
this.connection.disconnect(); // 斷線
}
}
export default new Connection();
在客戶端主要分為了兩部分,分別是實例化quill
的實例,以及quill
與yjs
客戶端通信的實現。在quill
的實現中主要是將quill
實例化,註冊游標的插件,隨機生成id
的方法,通過id
獲取隨機顏色的方法,以及游標同步的位置轉換。在quill
與yjs
客戶端通信的實現中,主要是完成了對於quill
與doc
的事件監聽,主要是遠程數據變更的回調,本地數據變化的回調,游標同步事件感知的回調。
import Quill from "quill";
import QuillCursors from "quill-cursors";
import tinyColor from "tinycolor2";
import { Awareness } from "y-protocols/awareness.js";
import {
Doc,
Text as YText,
createAbsolutePositionFromRelativePosition,
createRelativePositionFromJSON,
} from "yjs";
export type { Sources } from "quill";
Quill.register("modules/cursors", QuillCursors); // 註冊游標插件
export default new Quill("#editor", { // 實例化`quill`
theme: "snow",
modules: { cursors: true },
});
const COLOR_MAP: Record<string, string> = {}; // `id => color`
export const getRandomId = () => Math.floor(Math.random() * 10000).toString(); // 隨機生成用戶`id`
export const getCursorColor = (id: string) => { // 根據`id`獲取顏色
COLOR_MAP[id] = COLOR_MAP[id] || tinyColor.random().toHexString();
return COLOR_MAP[id];
};
export const updateCursor = (
cursor: QuillCursors,
state: Awareness["states"] extends Map<number, infer I> ? I : never,
clientId: number,
doc: Doc,
type: YText
) => {
try {
// 從`Awareness`中取得狀態
if (state && state.cursor && clientId !== doc.clientID) {
const user = state.user || {};
const color = user.color || "#aaa";
const name = user.name || `User: ${clientId}`;
// 根據`clientId`創建游標
cursor.createCursor(clientId.toString(), name, color);
// 相對位置轉換為絕對位置 // 選區為`focus --- anchor`
const focus = createAbsolutePositionFromRelativePosition(
createRelativePositionFromJSON(state.cursor.focus),
doc
);
const anchor = createAbsolutePositionFromRelativePosition(
createRelativePositionFromJSON(state.cursor.anchor),
doc
);
if (focus && anchor && focus.type === type) {
// 移動游標位置
cursor.moveCursor(clientId.toString(), {
index: focus.index,
length: anchor.index - focus.index,
});
}
} else {
// 根據`clientId`移除游標
cursor.removeCursor(clientId.toString());
}
} catch (err) {
console.error(err);
}
};
import "./index.css";
import quill, { getRandomId, updateCursor, Sources, getCursorColor } from "./quill";
import client from "./client";
import Delta from "quill-delta";
import QuillCursors from "quill-cursors";
import { compareRelativePositions, createRelativePositionFromTypeIndex } from "yjs";
const userId = getRandomId(); // 本地客戶端的`id` 或者使用`awareness.clientID`
const doc = client.doc; // `yjs`實例
const type = client.type; // 頂層類型
const cursors = quill.getModule("cursors") as QuillCursors; // `quill`游標模塊
const awareness = client.awareness; // 實時通信感知模塊
// 設置當前客戶端的信息 `State`的數據結構類似於`Record<string, unknown>`
awareness.setLocalStateField("user", {
name: "User: " + userId,
color: getCursorColor(userId),
});
// 頁面顯示的用戶信息
const userNode = document.getElementById("user") as HTMLInputElement;
userNode && (userNode.value = "User: " + userId);
type.observe(event => {
// 來源信息 // 本地`UpdateContents`不應該再觸發`ApplyDelta'
if (event.transaction.origin !== userId) {
const delta = event.delta;
quill.updateContents(new Delta(delta), "api"); // 應用遠程數據, 來源
}
});
quill.on("editor-change", (_: string, delta: Delta, state: Delta, origin: Sources) => {
if (delta && delta.ops) {
// 來源信息 // 本地`ApplyDelta`不應該再觸發`UpdateContents`
if (origin !== "api") {
doc.transact(() => {
type.applyDelta(delta.ops); // 應用`Ops`到`yjs`
}, userId); // 來源
}
}
const sel = quill.getSelection(); // 選區
const aw = awareness.getLocalState(); // 實時通信狀態數據
if (sel === null) { // 失去焦點
if (awareness.getLocalState() !== null) {
awareness.setLocalStateField("cursor", null); // 清除選區狀態
}
} else {
// 捲對位置轉換為相對位置 // 選區為`focus --- anchor`
const focus = createRelativePositionFromTypeIndex(type, sel.index);
const anchor = createRelativePositionFromTypeIndex(type, sel.index + sel.length);
if (
!aw ||
!aw.cursor ||
!compareRelativePositions(focus, aw.cursor.focus) ||
!compareRelativePositions(anchor, aw.cursor.anchor)
) {
// 選區位置發生變化 設置位置信息
awareness.setLocalStateField("cursor", { focus, anchor });
}
}
// 更新所有游標狀態到本地
awareness.getStates().forEach((aw, clientId) => {
updateCursor(cursors, aw, clientId, doc, type);
});
});
// 初始化更新所有遠程游標狀態到本地
awareness.getStates().forEach((state, clientId) => {
updateCursor(cursors, state, clientId, doc, type);
});
// 監聽遠程狀態變化的回調
awareness.on(
"change",
({ added, removed, updated }: { added: number[]; removed: number[]; updated: number[] }) => {
const states = awareness.getStates();
added.forEach(id => {
const state = states.get(id);
state && updateCursor(cursors, state, id, doc, type);
});
updated.forEach(id => {
const state = states.get(id);
state && updateCursor(cursors, state, id, doc, type);
});
removed.forEach(id => {
cursors.removeCursor(id.toString());
});
}
);
每日一題
https://github.com/WindrunnerMax/EveryDay
參考
https://docs.yjs.dev/
https://github.com/yjs/yjs
https://github.com/automerge/automerge
https://zhuanlan.zhihu.com/p/425265438
https://zhuanlan.zhihu.com/p/452980520
https://josephg.com/blog/crdts-go-brrr/
https://www.npmjs.com/package/quill-delta
https://josephg.com/blog/crdts-are-the-future/
https://github.com/yjs/yjs/blob/main/INTERNALS.md
https://cloud.tencent.com/developer/article/2081651