MongoDB Change Stream:簡介、嘗試與應用

来源:https://www.cnblogs.com/xybaby/archive/2018/08/13/9464328.html
-Advertisement-
Play Games

在MongoDB3.6引入的新feature中,change stream無疑是非常吸引人的。 Change streams allow applications to access real-time data changes without the complexity and risk of ...


  在MongoDB3.6引入的新feature中,change stream無疑是非常吸引人的。

  Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog.

  Change stream允許應用實時獲取mongodb數據的變更,這是個呼聲很高的一個的需求,可以用於ETL、跨平臺數據同步、通知服務等。以前沒有change stream的時候,也可以通過tail oplog來追蹤修改,但這是複雜、危險的野路子。

  本文地址:https://www.cnblogs.com/xybaby/p/9464328.html

Change Stream特點

an-introduction-to-change-streams一文中,總結了change stream的幾個特點

Targeted changes

  Changes can be filtered to provide relevant and targeted changes to listening applications.

Resumablility

  Resumability was top of mind when building change streams to ensure that applications can see every change in a collection. resume token

Total ordering

  MongoDB 3.6 has a global logical clock that enables the server to order all changes across a sharded cluster.

Durability

  Change streams only include majority-committed changes.

Security

  Change streams are secure – users are only able to create change streams on collections to which they have been granted read access.

Ease of use

  Change streams are familiar – the API syntax takes advantage of the established MongoDB drivers and query language, and are independent of the underlying oplog format.

Idempotence

  All changes are transformed into a format that’s safe to apply multiple times. Listening applications can use a resume token from any prior change stream event, not just the most recent one, because reapplying operations is safe and will reach the same consistent state.

 

  相比自動tail oplog,change stream 有以下優點:

  • 如果只有單個節點持久化,那麼oplog對應的操作是可能被回滾的,而change stream有Durability特性
  • 在sharded cluster環境,change stream跨shards,可以通過mongos tail oplog,而不用去每一個replica set上分別tail
 

  Change stream對MongoDB的部署有一些需求:

  • 只對replica sets 或者sharded cluster(MongoDB3.6中shard必須是replica set)有用,這個不難理解,因為change stream也是利用了oplog。如果是sharded cluster,必須都過mongos連接。
  • 必須使用WiredTiger 引擎,使用replica set protocol version 1
 

Change Stream試用

  在文章免費試用MongoDB雲資料庫 (MongoDB Atlas)教程中,介紹瞭如何使用MongoDB Atlas提供的雲資料庫服務,免費提供的集群剛好是使用WiredTiger 引擎的Replica set,因此本文基於這個環境來測試。主要測試Change Stream所支持的所有事件(change event)、fullDocument特性、resume特性。

  change event包括:

  • insert
  • delete
  • replace
  • update
  • invalidate

  有意思的是,相比CRUD,多了一個replace事件。update 與 replace的區別在於

  A replace operation uses the update command, and consists of two stages:
  • Delete the original document with the documentKey and
  • Insert the new document using the same documentkey

 

  測試方法:啟動兩個Mongo shell,一個操作資料庫,一個watch。為了方便區分,淺綠色背景為Operate,灰色背景為Watch

準備環境

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> use engineering
switched to db engineering

 

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> use engineering
switched to db engineering
order 2
MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch()
assert: command failed: {
        "operationTime" : Timestamp(1533888296, 2),
        "ok" : 0,
        "errmsg" : "cannot open $changeStream for non-existent database: engineering",
        "code" : 26,
        "codeName" : "NamespaceNotFound",
        "$clusterTime" : {
                "clusterTime" : Timestamp(1533888296, 2),
                "signature" : {
                        "hash" : BinData(0,"fWTN4Kuv7cq9xCcC0vCF4AkTxuU="),
                        "keyId" : NumberLong("6563302068054917121")
                }
        }
} : aggregate failed

  從watch報錯可以看出,只能對已經存在的db watch,因此可以先插入一條數據,創建對應的DB、Collection

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test1', age: 18, 'email':'[email protected]'})
WriteResult({ "nInserted" : 1 })

 

  Watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch()
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
2018-08-10T16:08:49.200+0800 E QUERY    [thread1] Error: error hasNext: false :
DBCommandCursor.prototype.next@src/mongo/shell/query.js:853:1
@(shell):1:1

  此時已經創建好用於監聽的cursor,此時還沒有change event。

Insert 

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test2', age: 19, 'email':'[email protected]'})
WriteResult({ "nInserted" : 1 })

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttSC0AAAADRmRfaWQAZFttSCb45nBxa/FSsABaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "insert",
        "fullDocument" : {
                "_id" : ObjectId("5b6d4826f8e670716bf152b0"),
                "username" : "test2",
                "age" : 19,
                "email" : "[email protected]"
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d4826f8e670716bf152b0")
        }
}

 

replace

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {age: 19})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

 

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttSSMAAAACRmRfaWQAZFttR+r45nBxa/FSrwBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "replace",
        "fullDocument" : {
                "_id" : ObjectId("5b6d47eaf8e670716bf152af"),
                "age" : 19
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d47eaf8e670716bf152af")
        }
}

 

  可以看到,操作的時候使用的是db.collection.update,但change event 卻是replace,原因在eplace-a-document-entirely中有介紹

If the <update> document contains only field:value expressions, then:

delete

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.remove({ "_id" : ObjectId("5b6d47eaf8e670716bf152af")})
WriteResult({ "nRemoved" : 1 })

 

  watch

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttSfAAAAAFRmRfaWQAZFttR+r45nBxa/FSrwBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "delete",
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d47eaf8e670716bf152af")
        }
}

 

update

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test1', age: 18, 'email':'[email protected]'})
WriteResult({ "nInserted" : 1 })
MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {$set: {age: 19}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

 

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttSmQAAAAERmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "insert",
        "fullDocument" : {
                "_id" : ObjectId("5b6d4a5cf8e670716bf152b2"),
                "username" : "test1",
                "age" : 18,
                "email" : "[email protected]"
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d4a5cf8e670716bf152b2")
        }
}

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttSn0AAAABRmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "update",
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d4a5cf8e670716bf152b2")
        },
        "updateDescription" : {
                "updatedFields" : {
                        "age" : 19
                },
                "removedFields" : [ ]
        }
}

 

update fullDocument

  db.collection.watch 可以設置選項fullDocument參數,這個在change event:update的時候就可以返回對用documents的完整信息。

MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch([], {fullDocument:'updateLookup'} )

 

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {$set: {age: 29}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

 

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttS88AAAAERmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "update",
        "fullDocument" : {
                "_id" : ObjectId("5b6d4a5cf8e670716bf152b2"),
                "username" : "test1",
                "age" : 29,
                "email" : "[email protected]"
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d4a5cf8e670716bf152b2")
        },
        "updateDescription" : {
                "updatedFields" : {
                        "age" : 29
                },
                "removedFields" : [ ]
        }
}

 

resume change stream

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({"username": "test3", "age": 14})
WriteResult({ "nInserted" : 1 })
MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({"username": "test3", "age": 14})
WriteResult({ "nInserted" : 1 })
MongoDB Enterprise free-shard-0:PRIMARY> db.users.remove({"username": "test3"})
WriteResult({ "nRemoved" : 2 })

 

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> ret = cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"gltusJ4AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "insert",
        "fullDocument" : {
                "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c"),
                "username" : "test3",
                "age" : 14
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c")
        }
}
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"gltusKAAAAABRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "insert",
        "fullDocument" : {
                "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d"),
                "username" : "test3",
                "age" : 14
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d")
        }
}
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"gltusK8AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "delete",
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c")
        }
}
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"gltusK8AAAACRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "delete",
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d")
        }
}
Mongo

 

  Resume Watch

MongoDB Enterprise free-shard-0:PRIMARY> resume_cursor = db.users.watch([], {"resumeAfter": ret['_id']})
{ "_id" : { "_data" : BinData(0,"gltusKAAAAABRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } }
{ "_id" : { "_data" : BinData(0,"gltusK8AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } }
{ "_id" : { "_data" : BinData(0,"gltusK8AAAACRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } }
MongoDB Enterprise free-shard-0:PRIMARY> resume_cursor.next()
2018-08-11T17:49:13.127+0800 E QUERY    [thread1] Error: error hasNext: false :
DBCommandCursor.prototype.next@src/mongo/shell/query.js:853:1
@(shell):1:1

 

  在resume_cursor中,resumeAfter的參數設置為了之前的watch document,在watch的時候會一次性返回已經被消費過的change event

Change Stream應用

DDIA cdc

  在Designing Data-Intensive Applications一書中,有一節Change Data Capture(cdc),講述得就是複製集(replica set)中replication log的使用,對於MongoDB, replication log其實就是oplog。書中提到:

The problem with most databases’ replication logs is that they have long been considered to be an internal implementation detail of the database, not a public API.

  也就是說,應用(client)只能按照db的約束來使用db,而不是直接讀取、解析replication log。但直接使用replic log直接用來創建serach index,cache,data warehouse。如下圖所示:

   

  change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems.

  CDC使得Search index, Data warehouse成為了派生數據系統(derived data systems),也可以理解為是DB數據的視圖。另外,有意思的是,上圖db、replication log、derived data system組成的系統看起來很像一個中心化複製集(single leader):DB是leader(Primary),derived data system(cache, data warehouse)是follower(Secondary)。

  Change stream應用前景非常廣泛,在 完美數據遷移-MongoDB Stream的應用 一文中,介紹了使用change stream來在服務化改造的時候做數據遷移,且給出了一個完整的示範。在USING MONGODB AS A REALTIME DATABASE WITH CHANGE STREAMS一文中,也結合NodeJs給出了一個簡單的使用案列。

Change Stream實現與問題

官方對在Sharded Cluster上使用change stream有一些說明,可以參考文檔,有以下幾點值得註意:

(1)

To guarantee total ordering of changes, for each change notification the mongos checks with each shard to see if the shard has seen more recent changes. 

  不管有沒有數據變更,mongos都需要在所有shards上check,影響了change steam的響應時間。如果網路延時大,如geographically distributed shard,問題會更明顯。如果數據變更特別頻繁,那麼Change stream可能跟不上變化

(2)  

For sharded collections, update operations with multi : true may cause any change streams opened against that collection to send notifications for orphaned documents.

  對於update操作,如果設置 multi:True,那麼操作也可能在 orphaned documents.上執行,這樣也會產生多餘的change stream,應用可能需要處理這種情侶。BTW,ofphaned document是很令人頭疼的問題。

   

  另外,MongoDB3.6只能針對單個collection進行watch,這樣如果要關註多個collection或者多個db的write event時,需要分別建立連接進行watch,在 MongoDB 3.6 Change Streams: A Nest Temperature and Fan Control Use Case一文中提到這可能帶來性能問題

  It’s estimated that after 1000 streams you will start to see very measurable performance drops

  不過,在MongoDB4.0中,可以在db,甚至cluster這個級別watch stream,對應用來說方便了很多,也避免了性能問題。

總結

  本文介紹了MongoDB Change Stream這一新特性,以及其在具體應用中需要註意到的一些問題,並基於MongoDB atlas進行了簡單的嘗試。毫無疑問,Change Stream是非常有前途的特性,能解決很多現在實現起來很彆扭的問題。但是如果要用於線上業務,還需要大量的測試,尤其是容錯性與性能。

References

MongoDB Change Stream

an-introduction-to-change-streams

免費試用MongoDB雲資料庫 (MongoDB Atlas)教程

Designing Data-Intensive Applications

 完美數據遷移-MongoDB Stream

USING MONGODB AS A REALTIME DATABASE WITH CHANGE STREAMS