概念 在Orleans中,Streaming是一組API和功能集,它提供了一種構建、發佈和消費數據流的方式。 這些流可以是任何類型的數據,從簡單的消息到複雜的事件或數據記錄。Streaming API允許你定義、發佈和消費這些流,而無需關心底層的傳輸機制或數據存儲。 每個流都有一個唯一的標識符,稱為 ...
概念
在Orleans中,Streaming是一組API和功能集,它提供了一種構建、發佈和消費數據流的方式。
這些流可以是任何類型的數據,從簡單的消息到複雜的事件或數據記錄。Streaming API允許你定義、發佈和消費這些流,而無需關心底層的傳輸機制或數據存儲。
每個流都有一個唯一的標識符,稱為StreamId,用於區分不同的流。流可以是持久的,也可以是臨時的,具體取決於所使用的流提供者(Stream Provider)。流提供者負責處理流的存儲、傳輸和故障恢復。
作用
Streaming在Orleans中起到了至關重要的作用,主要體現在以下幾個方面:
-
解耦:Streaming允許將數據的產生者和消費者解耦。生產者可以發佈數據到流中,而消費者可以獨立地訂閱這些流並處理數據。這種解耦使得系統更加靈活和可擴展。
-
實時性:通過Streaming,你可以實時地處理和響應數據流。這對於需要實時分析、監控或響應的場景非常有用。
-
故障恢復:Orleans的Streaming機制具有強大的故障恢復能力。即使在出現網路分區或節點故障的情況下,流提供者也能夠確保數據的可靠性和一致性。
應用場景
-
實時日誌分析:你可以將應用程式的日誌消息發佈到流中,並使用專門的消費者來分析這些日誌。這允許你實時地監控和響應應用程式的行為。
-
事件驅動架構:在事件驅動架構中,你可以使用Streaming來發佈事件,並由多個消費者來處理這些事件。這有助於構建松耦合、可擴展和響應式的系統。
-
分散式協作:Streaming也可以用於實現分散式系統中的協作和通信。例如,多個節點可以發佈狀態更新到流中,其他節點可以訂閱這些流以獲取最新的狀態信息。
示例
安裝nuget包
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.0.0" />
配置Streaming
siloHostBuilder.AddMemoryStreams("StreamProvider").AddMemoryGrainStorage("PubSubStore");
定義一個Grain生成事件
public interface ISender : IGrainWithStringKey { Task Send(Guid rid); } public class SenderGrain : Grain, ISender { public Task Send(Guid rid) { var streamProvider = this.GetStreamProvider("StreamProvider"); var streamId = StreamId.Create("RANDOMDATA", rid); var stream = streamProvider.GetStream<int>(streamId); RegisterTimer(_ => { return stream.OnNextAsync(Random.Shared.Next()); }, null, TimeSpan.FromMilliseconds(1_000), TimeSpan.FromMilliseconds(1_000)); return Task.CompletedTask; } }
再定義一個Grain訂閱事件
public interface IRandomReceiver : IGrainWithGuidKey { Task Receive(); } [ImplicitStreamSubscription("RANDOMDATA")] public class ReceiverGrain : Grain, IRandomReceiver { public override async Task OnActivateAsync(CancellationToken cancellationToken) { var streamProvider = this.GetStreamProvider("StreamProvider"); var rid = this.GetPrimaryKey(); var streamId = StreamId.Create("RANDOMDATA", rid); var stream = streamProvider.GetStream<int>(streamId); await stream.SubscribeAsync<int>( async (data, token) => { Console.WriteLine(data); await Task.CompletedTask; }); base.OnActivateAsync(cancellationToken); } public async Task Receive() { } }
然後即可測試
var rid = Guid.NewGuid(); var sender1 = client.GetGrain<ISender>("sender1"); await sender1.Send(rid); var reciver1 = client.GetGrain<IRandomReceiver>(new Guid()); await reciver1.Receive();
流提供程式
提供程式可以通過在nuget種搜索Orleans.Streaming,也可以通過PersistentStreamProvider 與 IQueueAdapter 重寫來自定義Provider