Midjourney,是一個革命性的基於人工智慧的藝術生成器,可以從被稱為提示的簡單文本描述中生成令人驚嘆的圖像。Midjourney已經迅速成為藝術家、設計師和營銷人員的首選工具(包括像我這樣根本不會設計任何東西的無能之輩)。 為了幫助你開始使用這個強大的工具,我們彙編了一份15個資源的清單,可以 ...
最近的項目用到了 RxGo ,因為之前從沒有接觸過,特意去學了學,特此記錄下。文章很多內容是複製了參考資料或者官方文檔。如果涉及侵權,請聯繫刪除,謝謝。
1、RxGo簡介
1.1 基礎介紹
RxGo
是一個基於Go語言的響應式編程庫,它提供了一種簡單而強大的方式來處理非同步事件流和數據流
。RxGo的設計靈感來自於ReactiveX,它提供了類似於ReactiveX的操作符和概念,如Observable、Observer、Subject、Scheduler等。
RxGo的目標是提供一種簡單而強大的方式來處理非同步事件流和數據流,使得開發人員可以更容易地編寫高效、可維護和可擴展的代碼。RxGo的特點包括:
- 響應式編程:
RxGo
提供了Observable和Observer
兩個核心概念,使得開發人員可以更容易地處理非同步事件流和數據流。 - 操作符:
RxGo
提供了類似於ReactiveX的操作符,如map、filter、reduce等,使得開發人員可以更容易地對事件流進行轉換、過濾和聚合等操作。 - 調度器:
RxGo
提供了調度器,使得開發人員可以更容易地控制事件流的執行線程和順序。 - 可組合性:
RxGo
的操作符具有可組合性,使得開發人員可以更容易地組合多個操作符來實現複雜的操作。 - 高效性:
RxGo
的設計和實現都非常高效,可以處理大量的事件流和數據流。
總之,RxGo
是一個非常強大和實用的響應式編程庫,它可以幫助開發人員更容易地處理非同步事件流和數據流,提高代碼的可維護性和可擴展性。
1.2 RxGo 數據流程圖
RxGo的實現基於管道的概念。管道是由通道連接的一系列階段,其中每個階段是運行相同功能的一組goroutine。
- 使用
Just
操作符創建一個基於固定列表的靜態可觀測數據。 - 使用
Map
操作符定義了一個轉換函數(把圓形變成方形)。 - 用
Filter
操作符過濾掉黃色方形。
從上面的例子中可以看出來,最終生成的數據被髮送到一個通道中,消費者讀取數據進行消費。RxGo
中有很多種消費和生成數據的方式,發佈結果到通道中只是其中一種方式。
2、快速入門
2.1 安裝 RxGo v2
go get -u github.com/reactivex/rxgo/v2
2.2 簡單案例
我們先寫一個簡單的案例,來學習RxGo的簡單使用。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)()
ch := observable.Observe()
for item := range ch {
fmt.Println(item.V)
}
}
使用 RxGo 的一般流程如下:
- 使用相關的 Operator 創建 Observable,Operator 就是用來創建 Observable 的。
- 中間各個階段可以使用過濾操作篩選出我們想要的數據,使用轉換操作對數據進行轉換;
- 調用 Observable 的
Observe()
方法,該方法返回一個<- chan rxgo.Item
。然後for range
遍歷即可。
結合上面的這張圖,我們就比較容易理解RxGo的數據處理流程。因為例子比較簡單,沒有用到Map、Filter
操作。
執行結果:
$ go run main.go
1
2
3
4
5
Just
使用到柯里化的編程思想。
柯里化(Currying)是一種函數式編程的技術,它將一個接受多個參數的函數轉換成一系列接受單個參數的函數。這些單參數函數可以被組合起來,以便在後續的計算中使用。
柯里化的主要優點是它可以使函數更加靈活和可復用。通過將函數分解為一系列單參數函數,我們可以更容易地組合和重用這些函數,從而減少代碼的重覆性和冗餘性。
例如:
//柯里化的例子
func addCurried(x int) func(int) int {
return func(y int) int {
return x + y
}
}
func main() {
add5 := addCurried(5)
fmt.Println(add5(10))
}
由於 Go 不支持多個可變參數,Just
通過柯里化迂迴地實現了這個功能:
//Just creates an Observable with the provided items.
func Just(items ...interface{}) func(opts ...Option) Observable {
return func(opts ...Option) Observable {
return &ObservableImpl{
iterable: newJustIterable(items...)(opts...),
}
}
}
Observe()
返回一個 Item 的chan ,Item的結構如下:
// Item is a wrapper having either a value or an error.
type Item struct {
V interface{}
E error
}
所以通過Just生成observable對象時,傳入的數據可以包含錯誤,在使用時通過 item.Error() 來區分。
func main() {
observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()
ch := observable.Observe()
for item := range ch {
if item.Error() {
fmt.Println("error:", item.E)
} else {
fmt.Println(item.V)
}
}
}
我們使用item.Error()
檢查是否出現錯誤。然後使用item.V
訪問數據,item.E
訪問錯誤。
除了使用for range
之外,我們還可以調用 Observable 的ForEach()
方法來實現遍歷。ForEach()
接受 3 個回調函數:
NextFunc
:類型為func (v interface {})
,傳入的數據不包含錯誤類型時走此函數處理。ErrFunc
:類型為func (err error)
,當傳入的數據包含錯誤時走此函數;CompletedFunc
:類型為func ()
,Observable 完成時調用。
有點Promise
那味了。使用ForEach()
,可以將上面的示例改寫為:
func main() {
observable := rxgo.Just(1, 2, errors.New("這是一個測試錯誤!"), 4, 5)()
<-observable.ForEach(func(v interface{}) {
fmt.Println("received:", v)
}, func(err error) {
fmt.Println("error:", err)
}, func() {
fmt.Println("completed")
})
}
$ go run main.go
received: 1
received: 2
error: 這是一個測試錯誤!
received: 4
received: 5
completed
ForEach()
返回的是一個 chan,用於當 observable 關閉時會向此chan發送數據。所以在 observable
前面加了 <-
來阻塞等待 ForEach()
處理完數據。
3、RxGo 深入學習
上面的簡單案例,我們是使用Just
來創建observable
。其實還有其他的方式創建observable
。一起來看一看。
3.1 rxgo.Create
傳入一個[]rxgo.Producer
的切片,其中rxgo.Producer
的類型為func(ctx context.Context, next chan<- Item)
。我們可以在代碼中調用rxgo.Of(value)
生成數據,rxgo.Error(err)
生成錯誤,然後發送到next
通道中:
package main
import (
"context"
"errors"
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of("aaa")
next <- rxgo.Of(errors.New("test"))
}})
ch := observable.Observe()
for item := range ch {
if item.Error() {
fmt.Println("err:", item.E)
}else {
fmt.Println(item.V)
}
}
}
因為rxgo.Create中的參數是[]rxgo.Producer
,所以分成兩個rxgo.Producer
也是一樣的效果:
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of(2)
next <- rxgo.Of(3)
next <- rxgo.Error(errors.New("unknown"))
}, func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(4)
next <- rxgo.Of(5)
}})
3.2 rxgo.FromChannel
FromChannel
可以直接從一個已存在的<-chan rxgo.Item
對象中創建 Observable:
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 0; i < 5; i++ {
ch <- rxgo.Of(i)
}
//需要手動關閉 ch 通道
close(ch)
}()
observable := rxgo.FromChannel(ch)
for item := range observable.Observe() {
if item.Error() {
fmt.Println("err:", item.E)
}else {
fmt.Println(item.V)
}
}
}
註意:
通道需要手動調用
close()
關閉,上面Create()
方法內部rxgo
自動幫我們執行了這個步驟。
func newCreateIterable(fs []Producer, opts ...Option) Iterable {
...
go func() {
// Create方法內部自動關閉了 next 通道
defer close(next)
for _, f := range fs {
f(ctx, next)
}
}()
...
}
3.3 rxgo.Interval
Interval
以傳入的時間間隔生成一個無窮的數字序列,從 0 開始:
func main() {
observable := rxgo.Interval(rxgo.WithDuration(time.Second))
for item := range observable.Observe() {
if item.Error() {
fmt.Println("err:", item.E)
}else {
fmt.Println(item.V)
}
}
}
運行後,第一秒輸出 0,第二秒輸出 1,以此類推。
3.4 rxgo.Range
func main() {
observable := rxgo.Range(0, 3)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
Range
可以生成一個範圍內的數字:
上面代碼依次輸出 0,1,2,3。
3.5 Repeat
這個和之前的不太一樣,這個是對已經存在的 observable
對象調用 Repeat
方法,從而實現重覆生成數據。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)
func main() {
observable := rxgo.Range(0,3).Repeat(2, rxgo.WithDuration(time.Second))
for item := range observable.Observe() {
if item.Error() {
fmt.Println("err:", item.E)
}else {
fmt.Println(item.V)
}
}
}
輸出:
0
1
2
0
1
2
0
1
2
註意:這裡執行的次數一共是3
次,Repeat中的參數是2,重覆2次,一共3次。
3.6 rxgo.Start
可以給Start
方法傳入[]rxgo.Supplier
作為參數,它可以包含任意數量的rxgo.Supplier
類型。rxgo.Supplier
的底層類型為:
var Supplier func(ctx context.Context) rxgo.Item
Observable 內部會依次調用這些rxgo.Supplier
生成rxgo.Item
:
package main
import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)
func Supplier1(ctx context.Context) rxgo.Item {
deadline, ok := ctx.Deadline()
fmt.Println("Supplier1", deadline, ok)
time.Sleep(time.Second)
return rxgo.Of(1)
}
func Supplier2(ctx context.Context) rxgo.Item {
deadline, ok := ctx.Deadline()
fmt.Println("Supplier2", deadline, ok)
time.Sleep(time.Second)
return rxgo.Of(2)
}
func Supplier3(ctx context.Context) rxgo.Item {
deadline, ok := ctx.Deadline()
fmt.Println("Supplier3", deadline, ok)
time.Sleep(time.Second)
return rxgo.Of(3)
}
func main() {
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
observable := rxgo.Start([]rxgo.Supplier{Supplier1, Supplier2, Supplier3}, rxgo.WithContext(ctx))
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
4、Observable 分類
根據數據在何處生成,Observable 被分為 Hot 和 Cold 兩種類型。
- Hot Observable:熱可觀測量,數據由可觀測量外部產生。
- Cold Observable:冷可觀測量,數據由可觀測量內部產生。
通常不想一次性的創建所有的數據,使用 熱可觀測量。
4.1 熱可觀測量示例
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
for item := range observable.Observe() {
fmt.Println(item.V)
}
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
結果:
0
1
2
上面創建的是 Hot Observable。但是有個問題,第一次Observe()
消耗了所有的數據,第二個就沒有數據輸出了。(可以用可連接的觀測量來修改這一行為,後面再說)。
4.2 冷可觀測量示例
Cold Observable 就不會有這個問題,因為它創建的流是獨立於每個觀察者的。即每次調用Observe()
都創建一個新的 channel。我們使用Defer()
方法創建 Cold Observable,它的參數與Create()
方法一樣。
func main() {
observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
}})
for item := range observable.Observe() {
fmt.Println(item.V)
}
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
Defer源碼介紹:
// Defer does not create the Observable until the observer subscribes,
// and creates a fresh Observable for each observer.
func Defer(f []Producer, opts ...Option) Observable {
return &ObservableImpl{
iterable: newDeferIterable(f, opts...),
}
}
執行結果:
$ go run main.go
0
1
2
0
1
2
4.3 可連接的 Observable
可連接的(Connectable)Observable 對普通的 Observable 進行了一層組裝。調用它的Observe()
方法時並不會立刻產生數據。使用它,我們可以等所有的觀察者都準備就緒了(即調用了Observe()
方法)之後,再調用其Connect()
方法開始生成數據。我們通過兩個示例比較使用普通的 Observable 和可連接的 Observable 有何不同。
4.3.1 普通的Observable,並不是可連接的Observable
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 1; i <= 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})
time.Sleep(3 * time.Second)
fmt.Println("before subscribe second observer")
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})
time.Sleep(3 * time.Second)
}
上例中我們使用DoOnNext()
方法來註冊觀察者。由於DoOnNext()
方法是非同步執行的,所以為了等待結果輸出,在最後增加了一行time.Sleep
。運行結果:
First observer: 1
First observer: 2
First observer: 3
before subscribe second observer
由輸出可以看出,註冊第一個觀察者之後就開始產生數據了。第二個觀察者並不會得到數據。
4.3.2 可連接的Observable
通過在創建 Observable 的方法中指定rxgo.WithPublishStrategy()
選項就可以創建可連接的 Observable:
- 重點是傳入
rxgo.WithPublishStrategy()
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 1; i <= 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})
time.Sleep(3 * time.Second)
fmt.Println("before subscribe second observer")
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})
//需要手動調用 observable.Connect 才會產生數據
observable.Connect(context.Background())
time.Sleep(3 * time.Second)
}
運行輸出:
$ go run main.go
before subscribe second observer
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
上面是等兩個觀察者都註冊之後,並且手動調用了 Observable 的Connect()
方法才產生數據。而且可連接的 Observable 有一個特性:它是冷啟動的!!!,即每個觀察者都會收到一份相同的拷貝。
5、轉換 Observable
通過 RxGo 數據流程圖
我們知道,我們可以對rxgo.Item
進行轉換。rxgo 提供了很多轉換函數,下麵一起來學一學這些轉換函數。
5.1 Map
Map()
方法簡單修改它收到的rxgo.Item
然後發送到下一個階段(轉換或過濾)。Map()
接受一個類型為func (context.Context, interface{}) (interface{}, error)
的函數。第二個參數就是rxgo.Item
中的數據,返迴轉換後的數據。如果出錯,則返回錯誤。
func main() {
observable := rxgo.Just(1, 2, 3)()
observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int), nil
}).Map(func(_ context.Context, i interface{}) (interface{}, error) {
b := i.(int)
if b % 2 == 0 {
return nil, errors.New("test")
} else {
return i, nil
}
})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
上例中每個數字經過兩個Map
,第一個Map
邏輯是原樣輸出
,第二個Map
邏輯是判斷i是不是偶數,如果是偶數,就返回錯誤,否則原樣輸出
。運行結果:
1
<nil>
我們將第一個Map中的語句改為下麵的邏輯:
return i.(int) + 1, nil
運行結果:
<nil>
我們可以知道,數據的處理是串列的,第一個數據執行完所有的Map過後,第二個數據才會執行,當其中某一個執行返回的結果包含錯誤,就不會繼續進行轉換了,即不會數據不會進入到 Observe()
中的通道中去。
5.2 Marshal
Marshal
對經過它的數據進行一次Marshal
。這個Marshal
可以是json.Marshal/proto.Marshal
,甚至我們自己寫的Marshal
函數。它接受一個類型為func(interface{}) ([]byte, error)
的函數用於對數據進行處理。
type User struct {
Name string `json:"name"`
Age int `json:"age"`
}
func main() {
observable := rxgo.Just(
User{
Name: "dj",
Age: 18,
},
User{
Name: "jw",
Age: 20,
},
)()
observable = observable.Marshal(json.Marshal)
for item := range observable.Observe() {
fmt.Println(string(item.V.([]byte)))
}
}
執行結果:
{"name":"dj","age":18}
{"name":"jw","age":20}
由於Marshal
操作返回的是[]byte
類型,我們需要進行類型轉換之後再輸出。
5.3 Unmarshal
既然有Marshal
,也就有它的相反操作Unmarshal
。Unmarshal
用於將一個[]byte
類型轉換為相應的結構體或其他類型。與Marshal
不同,Unmarshal
需要知道轉換的目標類型,所以需要提供一個函數用於生成該類型的對象。然後將[]byte
數據Unmarshal
到該對象中。Unmarshal
接受兩個參數,參數一是類型為func([]byte, interface{}) error
的函數,參數二是func () interface{}
用於生成實際類型的對象。我們拿上面的例子中生成的 JSON 字元串作為數據,將它們重新Unmarshal
為User
對象:
type User struct {
Name string `json:"name"`
Age int `json:"age"`
}
func main() {
observable := rxgo.Just(
`{"name":"dj","age":18}`,
`{"name":"jw","age":20}`,
)()
observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
return []byte(i.(string)), nil
}).Unmarshal(json.Unmarshal, func() interface{} {
return &User{}
})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
由於Unmarshaller
接受[]byte
類型的參數,我們在Unmarshal
之前加了一個Map
用於將string
轉為[]byte
。運行結果:
&{dj 18}
&{jw 20}
5.4 Buffer
Buffer
按照一定的規則收集接收到的數據,然後一次性發送出去(作為切片),而不是收到一個發送一個。有 3 種類型的Buffer
:
BufferWithCount(n)
:每收到n
個數據發送一次,最後一次可能少於n
個;BufferWithTime(n)
:發送在一個時間間隔n
內收到的數據;BufferWithTimeOrCount(d, n)
:收到n
個數據,或經過d
時間間隔,發送當前收到的數據。
5.4.1 BufferWithCount
func main() {
observable := rxgo.Range(0, 5)
observable = observable.BufferWithCount(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
執行結果:
[0 1]
[2 3]
[4]
最後一組只有一個。
5.4.2 BufferWithTime
unc main() {
ch := make(chan rxgo.Item, 1)
go func() {
i := 0
for range time.Tick(time.Second) {
ch <- rxgo.Of(i)
i++
}
}()
observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(2 * time.Second))
layout := "2006-01-02 13:04:05"
fmt.Println("startTime", time.Now().Format(layout))
for item := range observable.Observe() {
fmt.Println(item.V)
fmt.Println("nextTime", time.Now().Format(layout))
}
}
執行結果是不確定的,這裡需要註意:
startTime 2023-04-22 44:15:49
[0]
nextTime 2023-04-22 44:15:51
[1 2]
nextTime 2023-04-22 44:15:53
[3 4 5]
nextTime 2023-04-22 44:15:55
...
5.4.3 BufferWithTimeOrCount
func main() {
ch := make(chan rxgo.Item, 1)
go func() {
i := 0
for range time.Tick(time.Second) {
ch <- rxgo.Of(i)
i++
}
}()
observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(2*time.Second), 2)
layout := "2006-01-02 13:04:05"
fmt.Println("startTime", time.Now().Format(layout))
for item := range observable.Observe() {
fmt.Println(item.V)
fmt.Println("nextTime", time.Now().Format(layout))
}
}
執行結果:
startTime 2023-04-22 44:18:48
[0]
nextTime 2023-04-22 44:18:50
[1 2]
nextTime 2023-04-22 44:18:51
[3 4]
nextTime 2023-04-22 44:18:53
BufferWithTimeOrCount
是以BufferWithCount、BufferWithTime
誰先滿足條件為準,誰先滿足誰就先執行。
5.5 GroupBy
``GroupBy將一個
Observable分成多個
子Observable,每個
子Observable`包含相同的索引值的元素。
GroupBy
函數定義如下:
GroupBy(length int, distribution func(Item) int, opts ...Option) Observable
即將一個Observable
分成length個子Observable
,根據distribution
函數返回的int作為分組的依據。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
// 創建一個Observable,它發出一些整數值
source := rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)()
// 使用GroupBy操作符將整數值按照奇偶性進行分組
grouped := source.GroupBy(2, func(item rxgo.Item) int {
return item.V.(int) % 2
}, rxgo.WithBufferedChannel(10))
for subObservable := range grouped.Observe() {
fmt.Println("new subObservable ------ ")
for item := range subObservable.V.(rxgo.Observable).Observe() {
fmt.Printf("%v\n", item.V)
}
}
}
上面根據每個數模 3 的餘數將整個流分為 3 組。運行:
new subObservable ------
2
4
6
8
10
new subObservable ------
1
3
5
7
9
註意rxgo.WithBufferedChannel(10)
的使用,由於我們的數字是連續生成的,依次為 0->1->2->…->9->10。而 Observable 預設是惰性的,即由Observe()
驅動。內層的Observe()
在返回一個 0 之後就等待下一個數,但是下一個數 1 不在此 Observable 中。所以會陷入死鎖。使用rxgo.WithBufferedChannel(10)
,設置它們之間的連接 channel 緩衝區大小為 10,這樣即使我們未取出 channel 裡面的數字,上游還是能發送數字進來。
6、並行操作
預設情況下,這些轉換操作都是串列的,即只有一個 goroutine 負責執行轉換函數。從上面的Map
操作也可以得知預設是串列執行的。可以改變這一預設行為,使用rxgo.WithPool(n)
選項設置運行n
個 goroutine,或者rxgo.WitCPUPool()
選項設置運行與邏輯 CPU 數量相等的 goroutine。
package main
import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
"math/rand"
"time"
)
func main() {
observable := rxgo.Range(1, 10)
observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
time.Sleep(time.Duration(rand.Int31()))
return i.(int) + 1, nil
}, rxgo.WithCPUPool())
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
結果:
8
9
10
6
5
11
2
4
7
3
由於是並行運算,所以結果是不固定的。
我們可以直接看官網的介紹:https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md
7、過濾 Observable
我們可以對Observable 中發送過來的數據進行過濾,過濾掉不需要的數據,有以下方式:
-
Filter
-
ElementAt
-
Debounce
-
Distinct
-
Skip
-
Take
下麵的內容大多來自官方的示例,地址:https://github.com/ReactiveX/RxGo/tree/v2.5.0/doc
7.1 Filter
Filter()
接受一個類型為func (i interface{}) bool
的參數,通過的數據使用這個函數斷言,返回true
的將發送給下一個階段。否則,丟棄。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 3)().
Filter(func(i interface{}) bool {
return i != 2
})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
結果:
1
3
7.2 ElementAt
ElementAt()
只發送指定索引的數據,如ElementAt(2)
只發送索引為 2 的數據,即第 3 個數據。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
結果:
2
7.3 Debounce
只有當特定的時間跨度已經過去而沒有發出另一個Item
時,才從Observable發出一個Item
。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)
func main() {
ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
time.Sleep(2 * time.Second)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
time.Sleep(2 * time.Second)
close(ch)
}()
observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
結果:
1
3
上面示例,先收到 1,然後 2s 內沒收到數據,所以發送 1。接著收到了數據 2,由於馬上又收到了 3,所以 2 不會發送。收到 3 之後 2s 內沒有收到數據,發送了 3。所以最後輸出為 1,3。
7.4 Distinct
Distinct()
會記錄它發送的所有數據,它不會發送重覆的數據。由於數據格式多樣,Distinct()
要求我們提供一個函數,根據原數據返回一個唯一標識碼(有點類似哈希值)。基於這個標識碼去重。
package main
import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 2, 3, 4, 4, 5)().
Distinct(func(_ context.Context, i interface{}) (interface{}, error) {
return i, nil
})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
結果:
1
2
3
4
5
7.5 Skip
Skip
可以跳過前若幹個數據。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
結果:
3
4
5
7.6 Take
Take
只取前若幹個數據。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
結果:
1
2
8、選項
因為golang中不支持預設參數,所以我們經常會用到選項設計模式,rxgo中也大量使用到了此模式。
rxgo.WithBufferedChannel(10)
:設置 channel 的緩存大小;rxgo.WithPool(n)/rxgo.WithCpuPool()
:使用多個 goroutine 執行轉換操作;rxgo.WithPublishStrategy()
:使用發佈策略,即創建可連接的 Observable。
rxgo還有很多其他選項,具體看官方文檔,地址:
https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md
9、簡化的真實案例
假設現在有一個定時處理任務,結構如下:
type ScheduledTask struct {
RecordId int
HandleStartTime time.Time
Status bool
}
在執行具體的任務時,需要去資料庫查詢下是否已經被取消了,如果已經被取消掉的,則不再執行。
完整代碼如下:
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)
type ScheduledTask struct {
RecordId int
HandleStartTime string
Status bool
}
func main() {
ch := make(chan rxgo.Item)
go producer(ch)
time.Sleep(time.Second*3)
observable := rxgo.FromChannel(ch)
observable = observable.Filter(func(i interface{}) bool {
st := i.(*ScheduledTask)
return st.Status
}, rxgo.WithBufferedChannel(1))
// 消費可觀測量
for customer := range observable.Observe() {
st := customer.V.(*ScheduledTask)
fmt.Printf("resutl: --> %+v\n", st)
}
}
func producer(ch chan <- rxgo.Item) {
for i := 0; i < 10; i++ {
status := false
if i % 2 == 0 {
status = true
}
st := &ScheduledTask{
RecordId: i,
HandleStartTime: time.Now().Format("2006-01-02 13:04:05"),
Status: status,
}
ch <- rxgo.Of(st)
}
// 這裡千萬不要忘記了
close(ch)
}
結果:
resutl: --> &{RecordId:0 HandleStartTime:2023-04-22 46:04:07 Status:true}
resutl: --> &{RecordId:2 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:4 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:6 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:8 HandleStartTime:2023-04-22 46:04:10 Status:true}
參考鏈接
[官方例子](