Go中響應式編程庫RxGo詳細介紹

来源:https://www.cnblogs.com/huageyiyangdewo/archive/2023/04/22/17343528.html
-Advertisement-
Play Games

Midjourney,是一個革命性的基於人工智慧的藝術生成器,可以從被稱為提示的簡單文本描述中生成令人驚嘆的圖像。Midjourney已經迅速成為藝術家、設計師和營銷人員的首選工具(包括像我這樣根本不會設計任何東西的無能之輩)。 為了幫助你開始使用這個強大的工具,我們彙編了一份15個資源的清單,可以 ...


最近的項目用到了 RxGo ,因為之前從沒有接觸過,特意去學了學,特此記錄下。文章很多內容是複製了參考資料或者官方文檔。如果涉及侵權,請聯繫刪除,謝謝。

1、RxGo簡介

1.1 基礎介紹

RxGo是一個基於Go語言的響應式編程庫,它提供了一種簡單而強大的方式來處理非同步事件流和數據流。RxGo的設計靈感來自於ReactiveX,它提供了類似於ReactiveX的操作符和概念,如Observable、Observer、Subject、Scheduler等。

RxGo的目標是提供一種簡單而強大的方式來處理非同步事件流和數據流,使得開發人員可以更容易地編寫高效、可維護和可擴展的代碼。RxGo的特點包括:

  1. 響應式編程:RxGo提供了Observable和Observer兩個核心概念,使得開發人員可以更容易地處理非同步事件流和數據流。
  2. 操作符:RxGo提供了類似於ReactiveX的操作符,如map、filter、reduce等,使得開發人員可以更容易地對事件流進行轉換、過濾和聚合等操作。
  3. 調度器:RxGo提供了調度器,使得開發人員可以更容易地控制事件流的執行線程和順序。
  4. 可組合性:RxGo的操作符具有可組合性,使得開發人員可以更容易地組合多個操作符來實現複雜的操作。
  5. 高效性:RxGo的設計和實現都非常高效,可以處理大量的事件流和數據流。

總之,RxGo是一個非常強大和實用的響應式編程庫,它可以幫助開發人員更容易地處理非同步事件流和數據流,提高代碼的可維護性和可擴展性。

1.2 RxGo 數據流程圖

RxGo的實現基於管道的概念。管道是由通道連接的一系列階段,其中每個階段是運行相同功能的一組goroutine。

  • 使用Just操作符創建一個基於固定列表的靜態可觀測數據。
  • 使用Map操作符定義了一個轉換函數(把圓形變成方形)。
  • Filter操作符過濾掉黃色方形。

從上面的例子中可以看出來,最終生成的數據被髮送到一個通道中,消費者讀取數據進行消費。RxGo中有很多種消費和生成數據的方式,發佈結果到通道中只是其中一種方式。

2、快速入門

2.1 安裝 RxGo v2

go get -u github.com/reactivex/rxgo/v2

2.2 簡單案例

我們先寫一個簡單的案例,來學習RxGo的簡單使用。

package main

import (
  "fmt"

  "github.com/reactivex/rxgo/v2"
)

func main() {
  observable := rxgo.Just(1, 2, 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {
    fmt.Println(item.V)
  }
}

使用 RxGo 的一般流程如下:

  • 使用相關的 Operator 創建 ObservableOperator 就是用來創建 Observable 的。
  • 中間各個階段可以使用過濾操作篩選出我們想要的數據,使用轉換操作對數據進行轉換;
  • 調用 ObservableObserve()方法,該方法返回一個<- chan rxgo.Item。然後for range遍歷即可。

結合上面的這張圖,我們就比較容易理解RxGo的數據處理流程。因為例子比較簡單,沒有用到Map、Filter操作。

執行結果:

$ go run main.go 
1
2
3
4
5

Just使用到柯里化的編程思想。
柯里化(Currying)是一種函數式編程的技術,它將一個接受多個參數的函數轉換成一系列接受單個參數的函數。這些單參數函數可以被組合起來,以便在後續的計算中使用。

柯里化的主要優點是它可以使函數更加靈活和可復用。通過將函數分解為一系列單參數函數,我們可以更容易地組合和重用這些函數,從而減少代碼的重覆性和冗餘性。

例如:

//柯里化的例子
func addCurried(x int) func(int) int {
	return func(y int) int {
		return x + y
	}
}

func main()  {
	add5 := addCurried(5)
	fmt.Println(add5(10))
}

由於 Go 不支持多個可變參數,Just通過柯里化迂迴地實現了這個功能:

//Just creates an Observable with the provided items.
func Just(items ...interface{}) func(opts ...Option) Observable {
  return func(opts ...Option) Observable {
    return &ObservableImpl{
      iterable: newJustIterable(items...)(opts...),
    }
  }
}

Observe()返回一個 Item 的chan ,Item的結構如下:

// Item is a wrapper having either a value or an error.
type	Item struct {
		V interface{}
		E error
	}

所以通過Just生成observable對象時,傳入的數據可以包含錯誤,在使用時通過 item.Error() 來區分。

func main() {
  observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {
    if item.Error() {
      fmt.Println("error:", item.E)
    } else {
      fmt.Println(item.V)
    }
  }
}

我們使用item.Error()檢查是否出現錯誤。然後使用item.V訪問數據,item.E訪問錯誤。

除了使用for range之外,我們還可以調用 ObservableForEach()方法來實現遍歷。ForEach()接受 3 個回調函數:

  • NextFunc:類型為func (v interface {}),傳入的數據不包含錯誤類型時走此函數處理。
  • ErrFunc:類型為func (err error),當傳入的數據包含錯誤時走此函數;
  • CompletedFunc:類型為func ()Observable 完成時調用。

有點Promise那味了。使用ForEach(),可以將上面的示例改寫為:

func main() {
  observable := rxgo.Just(1, 2, errors.New("這是一個測試錯誤!"), 4, 5)()
  <-observable.ForEach(func(v interface{}) {
    fmt.Println("received:", v)
  }, func(err error) {
    fmt.Println("error:", err)
  }, func() {
    fmt.Println("completed")
  })
}
$ go run main.go 
received: 1
received: 2
error: 這是一個測試錯誤!
received: 4
received: 5
completed

ForEach()返回的是一個 chan,用於當 observable 關閉時會向此chan發送數據。所以在 observable前面加了 <-來阻塞等待 ForEach()處理完數據。

3、RxGo 深入學習

上面的簡單案例,我們是使用Just來創建observable。其實還有其他的方式創建observable。一起來看一看。

3.1 rxgo.Create

傳入一個[]rxgo.Producer的切片,其中rxgo.Producer的類型為func(ctx context.Context, next chan<- Item)。我們可以在代碼中調用rxgo.Of(value)生成數據,rxgo.Error(err)生成錯誤,然後發送到next通道中:

package main

import (
	"context"
	"errors"
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main()  {
	observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
		next <- rxgo.Of(1)
		next <- rxgo.Of("aaa")
		next <- rxgo.Of(errors.New("test"))
	}})

	ch := observable.Observe()
	for item := range ch {
		if item.Error() {
			fmt.Println("err:", item.E)
		}else {
			fmt.Println(item.V)
		}
	}
}

因為rxgo.Create中的參數是[]rxgo.Producer,所以分成兩個rxgo.Producer也是一樣的效果:

observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
  next <- rxgo.Of(1)
  next <- rxgo.Of(2)
  next <- rxgo.Of(3)
  next <- rxgo.Error(errors.New("unknown"))
  }, func(ctx context.Context, next chan<- rxgo.Item) {
  next <- rxgo.Of(4)
  next <- rxgo.Of(5)
}})

3.2 rxgo.FromChannel

FromChannel可以直接從一個已存在的<-chan rxgo.Item對象中創建 Observable

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)



func main()  {

	ch := make(chan rxgo.Item)
	go func() {
		for i := 0; i < 5; i++ {
			ch <- rxgo.Of(i)
		}

		//需要手動關閉 ch 通道
		close(ch)
	}()

	observable := rxgo.FromChannel(ch)
	for item := range observable.Observe() {
		if item.Error() {
			fmt.Println("err:", item.E)
		}else {
			fmt.Println(item.V)
		}
	}
}

註意:

通道需要手動調用close()關閉,上面Create()方法內部rxgo自動幫我們執行了這個步驟。

func newCreateIterable(fs []Producer, opts ...Option) Iterable {
	...

	go func() {
		// Create方法內部自動關閉了 next 通道
		defer close(next)
		for _, f := range fs {
			f(ctx, next)
		}
	}()

	...
}

3.3 rxgo.Interval

Interval以傳入的時間間隔生成一個無窮的數字序列,從 0 開始:

func main()  {
	
	observable := rxgo.Interval(rxgo.WithDuration(time.Second))
	for item := range observable.Observe() {
		if item.Error() {
			fmt.Println("err:", item.E)
		}else {
			fmt.Println(item.V)
		}
	}
}

運行後,第一秒輸出 0,第二秒輸出 1,以此類推。

3.4 rxgo.Range

func main() {
  observable := rxgo.Range(0, 3)
  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

Range可以生成一個範圍內的數字:

上面代碼依次輸出 0,1,2,3。

3.5 Repeat

這個和之前的不太一樣,這個是對已經存在的 observable對象調用 Repeat方法,從而實現重覆生成數據。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

func main()  {

	observable := rxgo.Range(0,3).Repeat(2, rxgo.WithDuration(time.Second))
	for item := range observable.Observe() {
		if item.Error() {
			fmt.Println("err:", item.E)
		}else {
			fmt.Println(item.V)
		}
	}
}

輸出:

0
1
2
0
1
2
0
1
2

註意:這裡執行的次數一共是3次,Repeat中的參數是2,重覆2次,一共3次。

3.6 rxgo.Start

可以給Start方法傳入[]rxgo.Supplier作為參數,它可以包含任意數量的rxgo.Supplier類型。rxgo.Supplier的底層類型為:

var Supplier func(ctx context.Context) rxgo.Item

Observable 內部會依次調用這些rxgo.Supplier生成rxgo.Item

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)



func Supplier1(ctx context.Context) rxgo.Item {
	deadline, ok  := ctx.Deadline()
	fmt.Println("Supplier1", deadline, ok)
	time.Sleep(time.Second)
	return rxgo.Of(1)
}

func Supplier2(ctx context.Context) rxgo.Item {
	deadline, ok  := ctx.Deadline()
	fmt.Println("Supplier2", deadline, ok)
	time.Sleep(time.Second)
	return rxgo.Of(2)
}

func Supplier3(ctx context.Context) rxgo.Item {
	deadline, ok  := ctx.Deadline()
	fmt.Println("Supplier3", deadline, ok)
	time.Sleep(time.Second)
	return rxgo.Of(3)
}

func main() {
	ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
	observable := rxgo.Start([]rxgo.Supplier{Supplier1, Supplier2, Supplier3}, rxgo.WithContext(ctx))
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

4、Observable 分類

根據數據在何處生成,Observable 被分為 HotCold 兩種類型。

  • Hot Observable:熱可觀測量,數據由可觀測量外部產生。
  • Cold Observable:冷可觀測量,數據由可觀測量內部產生。

通常不想一次性的創建所有的數據,使用 熱可觀測量。

4.1 熱可觀測量示例

func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 0; i < 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

結果:

0
1
2

上面創建的是 Hot Observable。但是有個問題,第一次Observe()消耗了所有的數據,第二個就沒有數據輸出了。(可以用可連接的觀測量來修改這一行為,後面再說)。

4.2 冷可觀測量示例

Cold Observable 就不會有這個問題,因為它創建的流是獨立於每個觀察者的。即每次調用Observe()都創建一個新的 channel。我們使用Defer()方法創建 Cold Observable,它的參數與Create()方法一樣。

func main() {
  observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    for i := 0; i < 3; i++ {
      ch <- rxgo.Of(i)
    }
  }})

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

Defer源碼介紹:

// Defer does not create the Observable until the observer subscribes,
// and creates a fresh Observable for each observer.
func Defer(f []Producer, opts ...Option) Observable {
	return &ObservableImpl{
		iterable: newDeferIterable(f, opts...),
	}
}

執行結果:

$ go run main.go
0
1
2
0
1
2

4.3 可連接的 Observable

可連接的(Connectable)Observable 對普通的 Observable 進行了一層組裝。調用它的Observe()方法時並不會立刻產生數據。使用它,我們可以等所有的觀察者都準備就緒了(即調用了Observe()方法)之後,再調用其Connect()方法開始生成數據。我們通過兩個示例比較使用普通的 Observable 和可連接的 Observable 有何不同。

4.3.1 普通的Observable,並不是可連接的Observable
func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("Second observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
}

上例中我們使用DoOnNext()方法來註冊觀察者。由於DoOnNext()方法是非同步執行的,所以為了等待結果輸出,在最後增加了一行time.Sleep。運行結果:

First observer: 1
First observer: 2
First observer: 3
before subscribe second observer

由輸出可以看出,註冊第一個觀察者之後就開始產生數據了。第二個觀察者並不會得到數據。

4.3.2 可連接的Observable

通過在創建 Observable 的方法中指定rxgo.WithPublishStrategy()選項就可以創建可連接的 Observable

  • 重點是傳入rxgo.WithPublishStrategy()
func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("Second observer: %d\n", i)
  })
	
  //需要手動調用 observable.Connect 才會產生數據
  observable.Connect(context.Background())
  time.Sleep(3 * time.Second)
}

運行輸出:

$ go run main.go
before subscribe second observer
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3

上面是等兩個觀察者都註冊之後,並且手動調用了 Observable 的Connect()方法才產生數據。而且可連接的 Observable 有一個特性:它是冷啟動的!!!,即每個觀察者都會收到一份相同的拷貝。

5、轉換 Observable

通過 RxGo 數據流程圖我們知道,我們可以對rxgo.Item進行轉換。rxgo 提供了很多轉換函數,下麵一起來學一學這些轉換函數。

5.1 Map

Map()方法簡單修改它收到的rxgo.Item然後發送到下一個階段(轉換或過濾)。Map()接受一個類型為func (context.Context, interface{}) (interface{}, error)的函數。第二個參數就是rxgo.Item中的數據,返迴轉換後的數據。如果出錯,則返回錯誤。

func main() {
	observable := rxgo.Just(1, 2, 3)()

	observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
		return i.(int), nil
	}).Map(func(_ context.Context, i interface{}) (interface{}, error) {
		b := i.(int)
		if b % 2 == 0 {
			return nil, errors.New("test")
		} else {
			return i, nil
		}
	})

	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

上例中每個數字經過兩個Map,第一個Map邏輯是原樣輸出,第二個Map邏輯是判斷i是不是偶數,如果是偶數,就返回錯誤,否則原樣輸出。運行結果:

1
<nil>

我們將第一個Map中的語句改為下麵的邏輯:

return i.(int) + 1, nil

運行結果:

<nil>

我們可以知道,數據的處理是串列的,第一個數據執行完所有的Map過後,第二個數據才會執行,當其中某一個執行返回的結果包含錯誤,就不會繼續進行轉換了,即不會數據不會進入到 Observe() 中的通道中去。

5.2 Marshal

Marshal對經過它的數據進行一次Marshal。這個Marshal可以是json.Marshal/proto.Marshal,甚至我們自己寫的Marshal函數。它接受一個類型為func(interface{}) ([]byte, error)的函數用於對數據進行處理。

type User struct {
  Name string `json:"name"`
  Age  int    `json:"age"`
}

func main() {
  observable := rxgo.Just(
    User{
      Name: "dj",
      Age:  18,
    },
    User{
      Name: "jw",
      Age:  20,
    },
  )()

  observable = observable.Marshal(json.Marshal)

  for item := range observable.Observe() {
    fmt.Println(string(item.V.([]byte)))
  }
}

執行結果:

{"name":"dj","age":18}
{"name":"jw","age":20}

由於Marshal操作返回的是[]byte類型,我們需要進行類型轉換之後再輸出。

5.3 Unmarshal

既然有Marshal,也就有它的相反操作UnmarshalUnmarshal用於將一個[]byte類型轉換為相應的結構體或其他類型。與Marshal不同,Unmarshal需要知道轉換的目標類型,所以需要提供一個函數用於生成該類型的對象。然後將[]byte數據Unmarshal到該對象中。Unmarshal接受兩個參數,參數一是類型為func([]byte, interface{}) error的函數,參數二是func () interface{}用於生成實際類型的對象。我們拿上面的例子中生成的 JSON 字元串作為數據,將它們重新UnmarshalUser對象:

type User struct {
  Name string `json:"name"`
  Age  int    `json:"age"`
}

func main() {
  observable := rxgo.Just(
    `{"name":"dj","age":18}`,
    `{"name":"jw","age":20}`,
  )()

  observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    return []byte(i.(string)), nil
  }).Unmarshal(json.Unmarshal, func() interface{} {
    return &User{}
  })

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

由於Unmarshaller接受[]byte類型的參數,我們在Unmarshal之前加了一個Map用於將string轉為[]byte。運行結果:

&{dj 18}
&{jw 20}

5.4 Buffer

Buffer按照一定的規則收集接收到的數據,然後一次性發送出去(作為切片),而不是收到一個發送一個。有 3 種類型的Buffer

  • BufferWithCount(n):每收到n個數據發送一次,最後一次可能少於n個;
  • BufferWithTime(n):發送在一個時間間隔n內收到的數據;
  • BufferWithTimeOrCount(d, n):收到n個數據,或經過d時間間隔,發送當前收到的數據。
5.4.1 BufferWithCount
func main() {
	observable := rxgo.Range(0, 5)

	observable = observable.BufferWithCount(2)

	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

執行結果:

[0 1]
[2 3]
[4]

最後一組只有一個。

5.4.2 BufferWithTime
unc main() {
	ch := make(chan rxgo.Item, 1)

	go func() {
		i := 0
		for range time.Tick(time.Second) {
			ch <- rxgo.Of(i)
			i++
		}
	}()

	observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(2 * time.Second))

	layout := "2006-01-02 13:04:05"
	fmt.Println("startTime", time.Now().Format(layout))
	for item := range observable.Observe() {
		fmt.Println(item.V)
		fmt.Println("nextTime", time.Now().Format(layout))

	}
}

執行結果是不確定的,這裡需要註意:

startTime 2023-04-22 44:15:49
[0]
nextTime 2023-04-22 44:15:51
[1 2]
nextTime 2023-04-22 44:15:53
[3 4 5]
nextTime 2023-04-22 44:15:55
...
5.4.3 BufferWithTimeOrCount
func main() {
	ch := make(chan rxgo.Item, 1)

	go func() {
		i := 0
		for range time.Tick(time.Second) {
			ch <- rxgo.Of(i)
			i++
		}
	}()

	observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(2*time.Second), 2)

	layout := "2006-01-02 13:04:05"
	fmt.Println("startTime", time.Now().Format(layout))
	for item := range observable.Observe() {
		fmt.Println(item.V)
		fmt.Println("nextTime", time.Now().Format(layout))
	}
}

執行結果:

startTime 2023-04-22 44:18:48
[0]
nextTime 2023-04-22 44:18:50
[1 2]
nextTime 2023-04-22 44:18:51
[3 4]
nextTime 2023-04-22 44:18:53

BufferWithTimeOrCount是以BufferWithCount、BufferWithTime誰先滿足條件為準,誰先滿足誰就先執行。

5.5 GroupBy

``GroupBy將一個Observable分成多個子Observable,每個子Observable`包含相同的索引值的元素。

GroupBy函數定義如下:

GroupBy(length int, distribution func(Item) int, opts ...Option) Observable

即將一個Observable分成length個子Observable,根據distribution函數返回的int作為分組的依據。

package main

import (
	"fmt"

	"github.com/reactivex/rxgo/v2"
)

func main() {
	// 創建一個Observable,它發出一些整數值
	source := rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)()

	// 使用GroupBy操作符將整數值按照奇偶性進行分組
	grouped := source.GroupBy(2, func(item rxgo.Item) int {
		return item.V.(int) % 2
	}, rxgo.WithBufferedChannel(10))

	for subObservable := range grouped.Observe() {
		fmt.Println("new subObservable ------ ")
		for item := range subObservable.V.(rxgo.Observable).Observe() {
			fmt.Printf("%v\n", item.V)
		}
	}

}

上面根據每個數模 3 的餘數將整個流分為 3 組。運行:

new subObservable ------ 
2
4
6
8
10
new subObservable ------ 
1
3
5
7
9

註意rxgo.WithBufferedChannel(10)的使用,由於我們的數字是連續生成的,依次為 0->1->2->…->9->10。而 Observable 預設是惰性的,即由Observe()驅動。內層的Observe()在返回一個 0 之後就等待下一個數,但是下一個數 1 不在此 Observable 中。所以會陷入死鎖。使用rxgo.WithBufferedChannel(10),設置它們之間的連接 channel 緩衝區大小為 10,這樣即使我們未取出 channel 裡面的數字,上游還是能發送數字進來。

6、並行操作

預設情況下,這些轉換操作都是串列的,即只有一個 goroutine 負責執行轉換函數。從上面的Map操作也可以得知預設是串列執行的。可以改變這一預設行為,使用rxgo.WithPool(n)選項設置運行n個 goroutine,或者rxgo.WitCPUPool()選項設置運行與邏輯 CPU 數量相等的 goroutine。

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"math/rand"
	"time"
)

func main() {
	observable := rxgo.Range(1, 10)

	observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
		time.Sleep(time.Duration(rand.Int31()))
		return i.(int) + 1, nil
	}, rxgo.WithCPUPool())

	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

8
9
10
6
5
11
2
4
7
3

由於是並行運算,所以結果是不固定的。

我們可以直接看官網的介紹:https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md

7、過濾 Observable

我們可以對Observable 中發送過來的數據進行過濾,過濾掉不需要的數據,有以下方式:

  • Filter

  • ElementAt

  • Debounce

  • Distinct

  • Skip

  • Take

下麵的內容大多來自官方的示例,地址:https://github.com/ReactiveX/RxGo/tree/v2.5.0/doc

7.1 Filter

Filter()接受一個類型為func (i interface{}) bool的參數,通過的數據使用這個函數斷言,返回true的將發送給下一個階段。否則,丟棄。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 3)().
		Filter(func(i interface{}) bool {
			return i != 2
		})
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

1
3

7.2 ElementAt

ElementAt()只發送指定索引的數據,如ElementAt(2)只發送索引為 2 的數據,即第 3 個數據。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

2

7.3 Debounce

只有當特定的時間跨度已經過去而沒有發出另一個Item時,才從Observable發出一個Item

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

func main() {
	ch := make(chan rxgo.Item)

	go func() {
		ch <- rxgo.Of(1)
		time.Sleep(2 * time.Second)
		ch <- rxgo.Of(2)
		ch <- rxgo.Of(3)
		time.Sleep(2 * time.Second)
		close(ch)
	}()

	observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

1
3

上面示例,先收到 1,然後 2s 內沒收到數據,所以發送 1。接著收到了數據 2,由於馬上又收到了 3,所以 2 不會發送。收到 3 之後 2s 內沒有收到數據,發送了 3。所以最後輸出為 1,3。

7.4 Distinct

Distinct()會記錄它發送的所有數據,它不會發送重覆的數據。由於數據格式多樣,Distinct()要求我們提供一個函數,根據原數據返回一個唯一標識碼(有點類似哈希值)。基於這個標識碼去重。

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 2, 3, 4, 4, 5)().
		Distinct(func(_ context.Context, i interface{}) (interface{}, error) {
			return i, nil
		})
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

1
2
3
4
5

7.5 Skip

Skip可以跳過前若幹個數據。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

3
4
5

7.6 Take

Take只取前若幹個數據。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

1
2

8、選項

因為golang中不支持預設參數,所以我們經常會用到選項設計模式,rxgo中也大量使用到了此模式。

  • rxgo.WithBufferedChannel(10):設置 channel 的緩存大小;
  • rxgo.WithPool(n)/rxgo.WithCpuPool():使用多個 goroutine 執行轉換操作;
  • rxgo.WithPublishStrategy():使用發佈策略,即創建可連接的 Observable

rxgo還有很多其他選項,具體看官方文檔,地址:

https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md

9、簡化的真實案例

假設現在有一個定時處理任務,結構如下:

type ScheduledTask struct {
	RecordId int
	HandleStartTime time.Time
	Status bool
}

在執行具體的任務時,需要去資料庫查詢下是否已經被取消了,如果已經被取消掉的,則不再執行。

完整代碼如下:

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

type ScheduledTask struct {
	RecordId int
	HandleStartTime string
	Status bool
}

func main() {
	ch := make(chan rxgo.Item)
	go producer(ch)

	time.Sleep(time.Second*3)
	observable := rxgo.FromChannel(ch)
	observable = observable.Filter(func(i interface{}) bool {
		st := i.(*ScheduledTask)
		return st.Status
	}, rxgo.WithBufferedChannel(1))

	// 消費可觀測量
	for customer := range observable.Observe() {
		st := customer.V.(*ScheduledTask)
		fmt.Printf("resutl: --> %+v\n", st)
	}
}

func producer(ch chan <- rxgo.Item)  {
	for i := 0; i < 10; i++ {
		status := false
		if i % 2 == 0 {
			status = true
		}
		st := &ScheduledTask{
			RecordId: i,
			HandleStartTime: time.Now().Format("2006-01-02 13:04:05"),
			Status: status,
		}
		ch <- rxgo.Of(st)
	}
	
  // 這裡千萬不要忘記了
	close(ch)
}

結果:

resutl: --> &{RecordId:0 HandleStartTime:2023-04-22 46:04:07 Status:true}
resutl: --> &{RecordId:2 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:4 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:6 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:8 HandleStartTime:2023-04-22 46:04:10 Status:true}

參考鏈接

Go 每日一庫之 rxgo

[官方例子](


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 8.1 線程簡介 1 、多任務 現實生活中多件事一起作。 在程式中是指在一個系統中可以同時進行多個進程,即有多個單獨運行的任務,每一個任務對應一個進程。 每一個進程都有一段專用的記憶體區域,即使是多次啟動同一段程式產生不同的進程也是如此。 2、多線程 Java 給多線程編程提供了內置的支持。 一條線程 ...
  • Canny檢測的流程 Canny檢測主要是用於邊緣檢測 1)使用高斯濾波器,以平滑圖像,濾除雜訊。 2)計算圖像中每個像素點的梯度強度和方向。 3)應用非極大值(Non-Maximum Suppression)抑制,以消除邊緣檢測帶來的雜散響應 4)應用雙閾值(Double-Threshold)檢測 ...
  • 題目描述 輸入年份和月份,輸出這一年的這一月有多少天。需要考慮閏年。 輸入格式 輸入兩個正整數,分別表示年份 $y$ 和月數 $m$,以空格隔開。 輸出格式 輸出一行一個正整數,表示這個月有多少天。 樣例 #1 樣例輸入 #1 1926 8 樣例輸出 #1 31 樣例輸入 #2 2000 2 樣例輸 ...
  • 說明 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。本篇對 VLD 源碼包中的各文件用途做個概述。同系列文章目錄可見 《記憶體泄漏檢測工具》目錄 1. 整體概覽 以 vld2.5.1 版本為例,下載源碼 後,根目錄下一共 5 個文件夾:.teamcity、lib、mfc_detect、set ...
  • 創建一個成績單文件score.xlsx,將平時成績單.xlsx文件中對應班級工作表中學號和姓名列的內容寫入到score.xlsx中,並添加成績列,每個學生的成績採用隨機生成的一個分數填寫進去,最後統計所有學生的平均成績計算出來後,寫入到score.xlsx的最後一行最後一列之後的單元格中去。預想的步 ...
  • 本文首發於公眾號:Hunter後端 原文鏈接:Django筆記二十八之資料庫查詢優化彙總 這一篇筆記將從以下幾個方面來介紹 Django 在查詢過程中的一些優化操作,有一些是介紹如何獲取 Django 查詢轉化的 sql 語句,有一些是理解 QuerySet 是如何獲取數據的。 以下是本篇筆記目錄: ...
  • #一:什麼是多線程 線程是操作系統能夠進行運算調度的最小單位;它被包含在進程之中,是進程中的實際運作單位。 多線程,是指從軟體或者硬體上實現多個線程併發執行的技術。具有多線程能力的電腦因有硬體支持而能夠在同一時間執行多於一個線程,進而提升整體處理性能。 簡單來說:線程是程式中一個單一的順序控制流程 ...
  • Springboot 多實例負載均衡部署 一、測試代碼: 控制層測試代碼: import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; @Controller @Re ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...