在.Net Framework環境下,我們使用Windows Workflow Foundation(WF)作為項目的工作流引擎,可是.Net Core已經不支持WF了,需要為基於.Net Core的項目選擇新的工作流引擎。基本要求如下: 輕量級,部署和使用都很簡單。 有相當數量的用戶,往往使用的人 ...
在.Net Framework環境下,我們使用Windows Workflow Foundation(WF)作為項目的工作流引擎,可是.Net Core已經不支持WF了,需要為基於.Net Core的項目選擇新的工作流引擎。基本要求如下:
- 輕量級,部署和使用都很簡單。
- 有相當數量的用戶,往往使用的人越多,產品也就越可靠,遇到問題也容易找到解決辦法。
- 支持使用配置文件定義工作流,而不僅僅是使用代碼定義。
符合上述要求的開源項目有幾個,這裡介紹開源項目WorkflowCore,項目地址:https://github.com/danielgerlag/workflow-core。
本文的示例可以從github下載:https://github.com/zhenl/ZL.WorflowCoreDemo 。
簡單的控制台項目
首先,使用Visual Studio創建一個.Net Core的控制台項目,在NuGet管理器中引入下麵程式包:
- WorkflowCore
- Microsoft.Extensions.DependencyInjection
- Microsoft.Extensions.Logging
然後,創建兩個工作流的步驟:
using WorkflowCore.Interface;
using WorkflowCore.Models;
namespace WorkflowCoreTest
{
public class HelloWorld : StepBody
{
public override ExecutionResult Run(IStepExecutionContext context)
{
Console.WriteLine("你好");
return ExecutionResult.Next();
}
}
}
using WorkflowCore.Interface;
using WorkflowCore.Models;
namespace WorkflowCoreTest
{
public class GoodbyeWorld : StepBody
{
public override ExecutionResult Run(IStepExecutionContext context)
{
Console.WriteLine("再見");
return ExecutionResult.Next();
}
}
}
接下來使用這兩個步驟定義一個工作流:
using WorkflowCore.Interface;
namespace WorkflowCoreTest
{
public class HelloWorldWorkflow : IWorkflow
{
public string Id => "HelloWorld";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith<HelloWorld>()
.Then<GoodbyeWorld>();
}
}
}
最後,在主程式中,創建WorkflowHost,註冊並運行工作流,代碼如下:
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading;
using WorkflowCore.Interface;
namespace WorkflowCoreTest
{
class Program
{
static void Main(string[] args)
{
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<HelloWorldWorkflow>();
host.Start();
host.StartWorkflow("HelloWorld", 1, null);
Console.ReadLine();
host.Stop();
}
private static IServiceProvider ConfigureServices()
{
//setup dependency injection
IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow();
var serviceProvider = services.BuildServiceProvider();
return serviceProvider;
}
}
}
簡單的工作流就完成了。
WorkflowHost
上一節通過一個簡單的控制台例子介紹了WorkflowCore工作流的定義和運行過程,從例子中可以看到,工作流是運行在WorkflowHost實例中的,再看一下代碼:
static void Main(string[] args)
{
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<HelloWorldWorkflow>();
host.Start();
host.StartWorkflow("HelloWorld", 1, null);
Console.ReadLine();
host.Stop();
}
WorkflowHost的工作過程是這樣的,首先需要獲取WorkflowHost的實例,然後註冊工作流,這裡可以註冊多個工作流,接下來,啟動host,然後可以啟動工作流,這裡可以啟動多個工作流實例,最後,關閉host。
我們需要對WorkflowHost有進一步的瞭解,第一個問題,每次使用serviceProvider.GetService
var host = serviceProvider.GetService<IWorkflowHost>();
var host1 = serviceProvider.GetService<IWorkflowHost>();
Console.WriteLine(host == host1);
我們獲取兩個host變數比較一下看是否指向相同的對象,結果是True,也就是使用serviceProvider.GetService<IWorkflowHost()獲得的是相同的對象。
第二個問題,調用host.Stop是否會影響正在執行的流程?
我們修改一下代碼,啟動流程實例後,馬上執行host.Stop():
host.RegisterWorkflow<HelloWorldWorkflow>();
host.Start();
host.StartWorkflow("HelloWorld", 1, null);
host.Stop();
Console.ReadLine();
我們發現,沒有輸出結果,也就是host.Stop()終止了所有流程。
第三個問題,host中啟動的流程是否在同一線程運行?
我們啟動多個流程,看一下輸出結果:
host.RegisterWorkflow<HelloWorldWorkflow>();
host.Start();
host.StartWorkflow("HelloWorld", 1, null);
host.StartWorkflow("HelloWorld", 1, null);
host.StartWorkflow("HelloWorld", 1, null);
host.Stop();
Console.ReadLine();
說明每個流程是一個獨立的線程,並行執行。
下一步我們需要瞭解流程的參數傳遞。
流程的數據對象和數據傳遞
我們已經知道瞭如何使用Fluent API定義流程和如何註冊流程,現在我們需要瞭解如何定義流程需要處理的數據,和如何進行數據傳遞。這裡舉一個最簡單的例子來說明。在前面的例子中,我們輸出“你好”和“再見”,現在擴展這個需求,流程啟動後,等待用戶輸入名字,然後輸出“你好,<輸入的名字>”和“<輸入的名字>,再見”。為了完成這個需求,需要:
- 定義一個數據結構用來保存輸入的名字
- 將這個數據結構與流程關聯起來
- 修改流程,讓流程等待用戶輸入
- 將用戶輸入的變數傳遞給流程
首先我們定義一個簡單的類,用來保存輸入的名字:
namespace ZL.WorflowCoreDemo.InputDataToStep
{
public class MyNameClass
{
public string MyName { get; set; }
}
}
然後,修改流程的定義:
using System;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.InputDataToStep
{
public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
{
public string Id => "HelloWithNameWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
.Output(data => data.MyName, step => step.EventData)
.Then<HelloWithName>()
.Input(step => step.Name, data => data.MyName)
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
這裡,流程聲明為 IWorkflow
這段代碼中還使用WaitFor定義了一個事件,這個事件的輸出是將事件接收的外部參數(step.EventData)傳遞給流程的MyName屬性。
還需要修改兩個步驟,增加名稱欄位:
using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;
namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
{
public class HelloWithName : StepBody
{
public string Name { get; set; }
public override ExecutionResult Run(IStepExecutionContext context)
{
Console.WriteLine("你好," + Name);
return ExecutionResult.Next();
}
}
}
using System;
using WorkflowCore.Interface;
using WorkflowCore.Models;
namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
{
public class GoodbyeWithName : StepBody
{
public string Name { get; set; }
public override ExecutionResult Run(IStepExecutionContext context)
{
Console.WriteLine(Name + ",再見");
return ExecutionResult.Next();
}
}
}
下麵是流程註冊和運行的代碼:
using System;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.InputDataToStep
{
public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
{
public string Id => "HelloWithNameWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
.Output(data => data.MyName, step => step.EventData)
.Then<HelloWithName>()
.Input(step => step.Name, data => data.MyName)
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
using System;
using System.Collections.Generic;
using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using WorkflowCore.Interface;
namespace ZL.WorflowCoreDemo.InputDataToStep
{
public class FlowRun
{
public static void Run()
{
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<HelloWithNameWorkflow, MyNameClass>();
host.Start();
var initialData = new MyNameClass();
var workflowId = host.StartWorkflow("HelloWithNameWorkflow", 1, initialData).Result;
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.PublishEvent("MyEvent", workflowId, value);
Console.ReadLine();
host.Stop();
}
private static IServiceProvider ConfigureServices()
{
//setup dependency injection
IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow();
var serviceProvider = services.BuildServiceProvider();
return serviceProvider;
}
}
}
我們也可以使用字典作為數據對象,流程的定義如下:
using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.InputDataToStep
{
public class HelloWithNameWorkflowDynamic : IWorkflow<Dictionary<string,string>>
{
public string Id => "HelloWithNameWorkflowDynamic";
public int Version => 1;
public void Build(IWorkflowBuilder<Dictionary<string, string>> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
.Output((step,data)=>data.Add("Name",(string)step.EventData))
.Then<HelloWithName>()
.Input(step => step.Name, data => data["Name"])
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data["Name"]);
}
}
}
這裡沒有使用自定義的類,而是使用了字典Dictionary<string, string>,流程的運行代碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>();
host.Start();
var initialData = new Dictionary<string,string>();
var workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData).Result;
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.PublishEvent("MyEvent", workflowId, value);
Console.ReadLine();
foreach (var key in initialData.Keys)
{
Console.WriteLine(key + ":" + initialData[key]);
}
Console.ReadLine();
host.Stop();
採用JSON格式定義流程
WorkflowCore 支持採用JSON或者YAML格式定義流程,使用時通過使用IDefintionLoader載入流程來替代RegisterWorkflow。我們仍然通過簡單的例子來說明。在我們現有的工程中已經定義了幾個簡單的流程步驟,我們用JSON格式將這幾個步驟組成簡單的工作流。
首先,在現有的解決方案中增加一個.Net Core的控制台項目,名稱為ZL.WorkflowCoreDemo.Json,使用NuGet引入WorkflowCore,Microsoft.Extensions.Logging,還有WorkflowCore.DSL,然後,我們在項目中增加一個json文件,將文件的屬性“複製到輸出目錄”修改為“始終複製”:
在json文件中定義流程:
{
"Id": "HelloWorld",
"Version": 1,
"Steps": [
{
"Id": "Hello",
"StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
"NextStepId": "Bye"
},
{
"Id": "Bye",
"StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
}
]
}
Json定義格式符合WorkflowCore的DSL,這裡不進行DSL的詳細介紹,我們重點關註流程如何定義,載入和運行。
我們可以將前面項目中的代碼拷貝過來進行修改,首先修改下麵的函數:
private static IServiceProvider ConfigureServices()
{
//setup dependency injection
IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow();
//這是新增加的服務
services.AddWorkflowDSL();
var serviceProvider = services.BuildServiceProvider();
return serviceProvider;
}
ConfigureServices新增加了services.AddWorkflowDSL();
在主函數中,使用IDefintionLoader載入JSON格式的流程定義:
static void Main(string[] args)
{
IServiceProvider serviceProvider = ConfigureServices();
var loader = serviceProvider.GetService<IDefinitionLoader>();
var json = System.IO.File.ReadAllText("myflow.json");
loader.LoadDefinition(json, Deserializers.Json);
var host = serviceProvider.GetService<IWorkflowHost>();
host.Start();
host.StartWorkflow("HelloWorld", 1, null);
Console.ReadLine();
host.Stop();
}
現在,流程可以運行了。
在研究過程中發現了一個坑,可能需要註意。在這個例子中我們使用了前面項目定義的流程步驟,如果在本項目中定義流程步驟,會出現找不到相應動態庫的錯誤,不知道是否是一個缺陷。
JSON格式(DSL)定義流程與使用Fluent API定義流程的比較
前面我們分別討論了使用Fluent API定義流程和使用JSON格式定義流程,按照以前的使用經驗,感覺這兩種定義方式應該可以互相轉換,互相代替,但在實際應用中發現並不是如此,兩種方式都有不能被替代的功能。
使用Fluent API可以使用Lambda 表達式定義步驟
我們可以在流程中直接使用Lambda表達式定義步驟,而不需要定義類,比如:
public class HelloWorldWorkflow : IWorkflow
{
public string Id => "HelloWorld";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context =>
{
Console.WriteLine("你好");
return ExecutionResult.Next();
})
.Then(context =>
{
Console.WriteLine("再見");
return ExecutionResult.Next();
});
}
}
這種方式無法使用JSON等格式實現。
採用JSON等DSL格式可以方便地定義步驟間的跳轉
採用JSON等DSL格式時,每個步驟有明示的ID,步驟轉移通過ID標識進行,這樣可以很方便地進行步驟間的跳轉。而採用Fluent API則沒有這麼靈活,我們看以下的定義:
{
"Id": "HelloWorld",
"Version": 1,
"Steps": [
{
"Id": "Hello",
"StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
"NextStepId": "Bye"
},
{
"Id": "Bye",
"StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
"NextStepId": "Hello"
}
]
}
步驟“Hello”執行完成後,執行"Bye",“Bye”執行完又回到“Hello”,如此迴圈。但在Fluent API中就沒有這麼方便,必須使用迴圈或者其它的方式。而這種跳轉方式在實際應用中非常常見,最常見的場景就是審批流程中的提交/駁回,提交-駁回過程可以形成多次迴圈,這種流程模式,採用帶有步驟標記的跳轉很容易實現。
流程數據類的局限性
流程相關的數據類和流程步驟中的屬性在理論上是沒有限制的,我們可以使用複雜的數據類型,比如Dictionary<string,string>或者具有複雜層次的數據類,但在研究中我們發現由於JSON DSL定義的限制,我們無法實現複雜數據結構的數據傳遞。使用Fluent API定義的流程中,可以使用Lamdba 表達式,但在JSON DSL中沒找到更好的方法。
下麵的代碼展示通過Lamdba表達式實現兩個Dictionary<string,string>之間的數據傳遞,但在DSL中沒有對應的方式:
.Output((step, data)=> {
var dic = step.EventData as Dictionary<string, object>;
foreach (var key in dic.Keys)
{
if (data.MyDic.ContainsKey(key)) data.MyDic[key] = dic[key];
else data.MyDic.Add(key, dic[key]);
}
而在實際應用中,我們需要使用流程定義文件而不是寫死的代碼來定義流程,這樣在流程修改時,就不需要修改代碼和重新編譯部署。這個限制是WorkflowCore在實際項目中落地的一個主要障礙。
工作流持久化與恢復
WorkflowCore提供了幾乎針對流行資料庫的各種持久化方式,支持SqlServer、Sqlite等關係資料庫,也支持MongoDb、Redis等非關係資料庫。預設使用的是在記憶體中保存流程數據,但在實際應用中,必須將流程數據持久化以保證系統的可靠性。當系統因為計劃內或者意外原因出現異常後,正在執行的流程應該能夠在斷點處恢復並繼續執行。我們改造一下第一部分的例子,增加持久化設置,並模擬流程中斷和恢復過程。
首先,我們需要使用NuGet引入SqlServer持久化Provider:WorkflowCore.Persistence.SqlServer,當然也可以使用其它類型的數據存儲。
然後,修改ConfigureServices,將services.AddWorkflow()修改為:
services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=True;", true, true));
最後修改一下執行代碼,增加流程Id輸入和恢復代碼:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>();
host.Start();
var initialData = new Dictionary<string,string>();
Console.WriteLine("請輸入需要恢復的流程編號,如執行新流程直接回車:");
string workflowId = Console.ReadLine();
if (string.IsNullOrEmpty(workflowId))
{
workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData).Result;
Console.WriteLine(workflowId);
}
else
{
host.ResumeWorkflow(workflowId);
}
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.PublishEvent("MyEvent", workflowId, value);
下麵,我們模擬中斷-恢復過程。首先,運行程式,不輸入流程id,直接按回車,會生成新的流程,並輸出流程Id,拷貝這個流程ID,並退出程式:
再次執行程式,輸入或粘貼上一次生成的流程編號,可以繼續執行流程:
單元測試
我們已經創建簡單的工作流,並可以在控制台環境運行,現在我們可以為工作流創建簡單的單元測試,這裡我們使用xUnit作為測試框架。
在ZL.WorkflowCoreDemo解決方案中增加一個xUnit測試項目,命名為ZL.WorkflowCoreDemo.Test,創建好的項目中已經包含xunit和xunit.runner.visualstudio。我們還需要使用NuGet引入其它的框架,首先要引入FluentAssertions,這個框架結合xUnit,可以讓 我們在測試中使用Should斷言。還需要引入WorkflowCore和WorkflowCore.Testing以及我們需要測試的項目。這裡我們測試最簡單的HelloWorldWorkflow。
接下來編寫測試代碼,測試類需要繼承WorkflowTest<流程類,流程相關的數據類>,由於HelloWorldWorkflow沒有相關的數據類,我們使用dynamic代替,類的定義如下:
using System;
using Xunit;
using WorkflowCore.Testing;
using ZL.WorflowCoreDemo.Basic;
using WorkflowCore.Models;
using System.Threading;
using FluentAssertions;
namespace ZL.WorkflowCoreDemo.Test
{
public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
{
public DemoUnitTest()
{
Setup();
}
[Fact]
public void Test1()
{
dynamic data = new { };
var workflowId = StartWorkflow(data);
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
WorkflowStatus status = GetStatus(workflowId);
status.Should().Be(WorkflowStatus.Complete);
UnhandledStepErrors.Count.Should().Be(0);
}
}
}
需要註意的是在測試類的構造函數中調用Setup(),用來初始化流程引擎。
現在我們可以在測試資源管理器中運行測試項目,如果一切順利的化,結果是這樣的:
但有時候理想和現實總是有些差距,我在執行時遇到瞭如下的異常:
通過研究發現我引用的WorkflowCore是最新的3.1.2版本,而WorkflowCore.Testing的版本是2.2,應該是版本不一致造成的問題,WorkflowCore和WorkflowCore.Testing的更新不同步。這時,開源項目的好處就體現出來了,通過查看代碼,改寫測試類如下:
using System;
using Xunit;
using WorkflowCore.Testing;
using ZL.WorflowCoreDemo.Basic;
using WorkflowCore.Models;
using System.Threading;
using FluentAssertions;
namespace ZL.WorkflowCoreDemo.Test
{
public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
{
public DemoUnitTest()
{
Setup();
}
[Fact]
public void Test1()
{
dynamic data = new { };
var workflowId = StartWorkflow(data);
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
WorkflowStatus status = GetStatus(workflowId);
status.Should().Be(WorkflowStatus.Complete);
UnhandledStepErrors.Count.Should().Be(0);
}
protected new WorkflowStatus GetStatus(string workflowId)
{
var instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result;
return instance.Status;
}
protected new void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut)
{
var status = GetStatus(workflowId);
var counter = 0;
while ((status == WorkflowStatus.Runnable) && (counter < (timeOut.TotalMilliseconds / 100)))
{
Thread.Sleep(100);
counter++;
status = GetStatus(workflowId);
}
}
}
}
再次運行,測試通過了。
Activity Workers
前面提到了使用WaitFor暫停工作流,等待人工輸入後發佈事件重新激活流程,今天介紹另一種方式,使用WorkflowCore的Activity,它的作用就是等待數據輸入,數據輸入完成後,工作流繼續執行。下麵是簡單的例子:
using WorkflowCore.Interface;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.ActivityWorker
{
public class MyActivityWorkflow : IWorkflow<MyNameClass>
{
public string Id => "MyActivityWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith<HelloWithName>().Input(data => data.Name, step => step.MyName)
.Activity("activity-1", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result)
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName)
.Activity("activity-2", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result)
.Then<HelloWithName>().Input(step => step.Name, data => data.MyName)
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
這個例子很簡單,使用了我們前面定義的兩個步驟,HelloWithName和GoodbyeWithName,Activity在這裡就是接收外部輸入的Name。流程的運行代碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<MyActivityWorkflow, MyNameClass>();
host.Start();
var myClass = new MyNameClass { MyName = "張三" };
host.StartWorkflow("MyActivityWorkflow", 1, myClass);
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
}
activity = host.GetPendingActivity("activity-2", "worker2", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
}
Console.ReadLine();
host.Stop();
工作流啟動後,需要通過host.GetPendingActivity獲取Activity,獲取成功,就從外部獲取數據,然後使用host.SubmitActivitySuccess提交數據。
WaitFor vs Activity
使用WorkflowCore獲取外部數據時,有兩種方法可以讓流程等待外部數據,一是使用WaitFor註冊一個事件,外部數據輸入完成後,通過PublishEvent返迴流程;另一種是使用Activity,註冊一個人工活動,執行到這個活動時,工作流等待,外部代碼通過GetPendingActivity獲取相應的Activity,通過SubmitActivitySuccess提交數據。看起來兩種都可以完成外部數據輸入的任務,但實際中發現GetPendingActivity無法獲取是哪一個工作流實例的活動,如果有兩個實例同時運行,就沒有辦法分清除向哪個流程提報數據:
var id1=host.StartWorkflow("MyActivityWorkflow", 1, myClass).Result;
var id2 = host.StartWorkflow("MyActivityWorkflow", 1, myClass).Result;
//上面兩個實例中有相同的activity-1,無法知道這裡獲取的是哪一個實例的活動,
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
WairFor事件發佈時有工作流實例ID傳入:
host.PublishEvent("MyEvent", workflowId, value);
沒有上面的缺陷。
使用ForEach並行執行多個流程
如果需要同時執行多個過程相同的而輸入不同的流程,可以使用ForEach控制語句,一定要註意,這裡的ForEach不是迴圈,不是一個流程執行完再執行另一個流程,我們仍然使用前面定義的簡單的步驟來組織ForEach示例流程,代碼如下:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.Paralle
{
public class ParalleWorkflow : IWorkflow
{
public string Id => "ParalleWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context => { Console.WriteLine("開始"); ExecutionResult.Next(); })
.ForEach(data => new List<string>() { "張三", "李四", "王五", "趙六" })
.Do(x => x
.StartWith<HelloWithName>()
.Input(step => step.Name, (data, context) => context.Item as string)
.Then<GoodbyeWithName>()
.Input(step => step.Name, (data, context) => context.Item as string)
)
.Then(context => { Console.WriteLine("結束"); ExecutionResult.Next(); });
}
}
}
在這個例子里,我們沒有定義相關的數據類,需要輸入的人名作為ForEach中的迴圈變數,這些變數保存在context中,輸入到相應的環節中。執行代碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<ParalleWorkflow>();
host.Start();
host.StartWorkflow("ParalleWorkflow", 1, null);
Console.ReadLine();
host.Stop();
Parallel並行執行多個流程
前面我們提到了使用ForEach執行並行流程,這些流程的執行過程相同,不同的只是輸入的參數。如果需要並行執行多個不同的流程,需要使用Parallel,示例代碼如下:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
namespace ZL.WorflowCoreDemo.Paralle
{
public class ParallePathWorkflow : IWorkflow
{
public string Id => "ParallePathWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context => { Console.WriteLine("開始"); ExecutionResult.Next(); })
.Parallel()
.Do(then =>
then.StartWith(context=>{ Console.WriteLine("分支一開始"); ExecutionResult.Next(); })
.Then(context => { Console.WriteLine("分支一結束"); ExecutionResult.Next(); }))
.Do(then =>
then.StartWith(context => { Console.WriteLine("分支二開始"); ExecutionResult.Next(); })
.Then(context => { Console.WriteLine("分支二結束"); ExecutionResult.Next(); }))
.Do(then =>
then.StartWith(context => { Console.WriteLine("分支二開始"); ExecutionResult.Next(); })
.Then(context => { Console.WriteLine("分支二結束"); ExecutionResult.Next(); }))
.Join()
.Then(context => { Console.WriteLine("結束"); ExecutionResult.Next(); });
}
}
}
為了說明分支語句的構成,這個流程沒有使用關聯的數據類,也沒有使用類定義步驟,全部使用Lambda表達式。Parallel的結構是分支的開始是Parallel(),結束是Join(),每個分支在Do語句中表示。流程的運行代碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<ParallePathWorkflow>();
host.Start();
host.StartWorkflow("ParallePathWorkflow", 1, null);
Console.ReadLine();
host.Stop();
While迴圈
While迴圈會重覆執行某些步驟,直到條件得到滿足再繼續執行下麵的流程。使用While迴圈可以實現審批流程中的“提交/駁回”,如果審批沒有通過,駁回重新輸入,直到審批通過或者駁回次數到達上限。這裡舉一個簡單的例子說明使用方法,結合前面提到的Activity,可以實現對輸入進行判斷,如果輸入不滿足要求,就重新輸入。流程定義如下:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.ControlStructures
{
public class WhileWorkflow : IWorkflow<MyNameClass>
{
public string Id => "WhileWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith<HelloWithName>()
.Input(step => step.Name, data => data.MyName)
.While(data => data.MyName.Length < 3)
.Do(x => x
.StartWith(context=> { Console.WriteLine("輸入小於3個字元"); ExecutionResult.Next(); })
.Activity("activity-1", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result))
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
流程運行的代碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<WhileWorkflow, MyNameClass>();
host.Start();
var myClass = new MyNameClass { MyName = "張三" };
host.StartWorkflow("WhileWorkflow", 1, myClass);
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
while (activity != null)
{
Console.WriteLine("輸入大於3個字元的名字結束,小於3個字元的名字繼續");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
}
Console.ReadLine();
host.Stop();
If判斷
If判斷比較簡單,根據流程關聯的數據對象中的值進行判斷,如果條件滿足執行相應的分支。需要註意的是沒有else相關語句,如果需要實現相關邏輯,需要再次進行一次條件相反的判斷。下麵是簡單的例子,仍然使用前面定義的數據類和步驟,輸入採用Activity:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.ControlStructures
{
public class IfWorkflow : IWorkflow<MyNameClass>
{
public string Id => "IfWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith(context=> ExecutionResult.Next())
.Activity("activity-1", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result)
.If(data => data.MyName.Length < 3)
.Do(then=>then
.StartWith(context => { Console.WriteLine("輸入小於3個字元"); ExecutionResult.Next(); }))
.If(data => data.MyName.Length >= 3)
.Do(then => then
.StartWith(context => { Console.WriteLine("輸入大於等於3個字元"); ExecutionResult.Next(); }))
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
流程的運行代碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<IfWorkflow, MyNameClass>();
host.Start();
var myClass = new MyNameClass { MyName = "張三" };
host.StartWorkflow("IfWorkflow", 1, myClass);
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
}
Console.ReadLine();
host.Stop();
條件分支Decision Branches
Decision Branches有點類似於switch語句,可以為每個條件創建一個分支,這些分支相對獨立,根據不同的條件選擇執行。如果使用Fluent API,可以使用CreateBranch方法創建分支,然後在流程中使用分支。為了說明問題,我們改造前面的If流程,使用Decision Branches實現相同的功能,流程定義的代碼如下:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.ControlStructures
{
public class DecisionWorkflow : IWorkflow<MyNameClass>
{
public string Id => "DecisionWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
var branch1 = builder.CreateBranch()
.StartWith(context => { Console.WriteLine("輸入小於3個字元"); ExecutionResult.Next(); });
var branch2 = builder.CreateBranch()
.StartWith(context => { Console.WriteLine("輸入大於等於3個字元"); ExecutionResult.Next(); });
builder
.StartWith(context => ExecutionResult.Next())
.Activity("activity-1", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result)
.Decide(data => data.MyName.Length)
.Branch((data, outcome) => data.MyName.Length<3, branch1)
.Branch((data, outcome) => data.MyName.Length >= 3, branch2)
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
流程執行定義的代碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<DecisionWorkflow, MyNameClass>();
host.Start();
var myClass = new MyNameClass { MyName = "張三" };
host.StartWorkflow("DecisionWorkflow", 1, myClass);
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
}
Console.ReadLine();
host.Stop();
使用Schedule執行定時任務
WorkflowCore 提供了定時執行後臺任務的功能,使用Schedule可以定義非同步執行的任務,在工作流的後臺執行。示例代碼如下:
using System;
using WorkflowCore.Interface;
namespace ZL.WorflowCoreDemo.ControlStructures
{
public class ScheduleWorkflow : IWorkflow
{
public string Id => "ScheduleWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context => Console.WriteLine("開始"))
.Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
.StartWith(context => Console.WriteLine("後臺工作")))
.Then(context => Console.WriteLine("前臺工作"));
}
}
}
在上面的代碼中,工作流開始後,定義了一個Schedule,這個任務在延時5秒後,啟動一個後臺流程。流程的執行代碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<ScheduleWorkflow>();
host.Start();
var workflowId = host.StartWorkflow("ScheduleWorkflow", 1, null).Result;
Console.ReadLine();
host.Stop();
流程的執行代碼與前面的例子基本類似,執行結果如下:
執行時,前臺任務完成5秒後,後臺工作才執行。
使用Recur執行重覆的後臺任務
前面介紹的Schedule可以啟動一個後臺的定時任務,這個任務只執行一次。如果需要執行多次固定間隔的任務,可以使用Recur,當條件滿足時任務不再執行。Recur的定義與Schedule類似,只是多了條件判斷輸入,流程定義的代碼如下:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.ControlStructures
{
public class RecurWorkflow : IWorkflow<MyNameClass>
{
public string Id => "RecurWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith(context => Console.WriteLine("開始"))
.Recur(data => TimeSpan.FromSeconds(5),data=>data.MyName.Length>5).Do(recur => recur
.StartWith<HelloWithName>()
.Input(step => step.Name, data => data.MyName))
.Then(context => Console.WriteLine("前臺工作"))
.Activity("activity-1", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result);
}
}
}
這流程稍微複雜一點,我們增加了使用Activity的輸入,目的是看一下前臺的輸入等待是否會影響後臺的進程運行,還有就是前臺輸入的數據,能否正確傳遞到後臺,流程的運行代碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<RecurWorkflow,MyNameClass>();
host.Start();
var myClass = new MyNameClass { MyName = "張三" };
var workflowId = host.StartWorkflow("RecurWorkflow", 1, myClass).Result;
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
}
Console.ReadLine();
host.Stop();
運行效果如下:
可以看出,前臺需要的輸入等待並沒有影響後臺的執行,我們輸入一個新名字後:
集成Elasticsearch
WorkflowCore 自身的查詢功能很弱,不過它提供了Elasticsearch的plugin,可以使用Elasticsearch對流程進行索引和查詢。不太方便的地方是必須要安裝Elasticsearch。這裡先簡單介紹一下Elasticsearch,它是基於Lucene的搜索伺服器,提供了分散式多用戶的全文檢索引擎,基於RESTful web介面。網上關於Elasticsearch的資料很多,可以自行搜索。
如果希望使用Elasticsearch索引工作流,需要在項目中安裝WorkflowCore.Providers.Elasticsearch,使用NuGet安裝這個插件,然後在services中進行設置:
using Nest;
...
services.AddWorkflow(cfg =>
{
...
cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://localhost:9200")), "index_name");
});
在代碼中,通過依賴註入引入ISearchIndex,使用Search方法進行搜索:
Search(string terms, int skip, int take, params SearchFilter[] filters)
檢索的範圍包括流程的定義、描述、狀態等。如果流程相關的自定義數據類需要檢索,數據類需要實現ISearchable介面。
異常處理
WorkflowCore啟動的流程多線程的方式運行,如果流程中出現的異常不會拋出到主程式,很多情況下感覺流程莫名奇妙地結束了。為了避免這種情況,需要顯示地聲明流程步驟的異常處理。如果使用Fluent API定義流程,可以在流程後附加OnError處理異常,但我們更希望對異常進行集中處理和記錄,這時可以使用WorkflowHost服務的OnStepError事件。定義如下:
var host = serviceProvider.GetService<IWorkflowHost>();
host.OnStepError += Host_OnStepError;
異常處理代碼可以寫在Host_OnStepError中:
private static void Host_OnStepError(WorkflowCore.Models.WorkflowInstance workflow, WorkflowCore.Models.WorkflowStep step, Exception exception)
{
}
實際使用中的問題
到這裡,我們介紹了WorkflowCore的使用,下麵談一下這個項目在實際使用時遇到一些問題。
- 輕量級,部署和使用都很簡單。項目本身滿足這個條件,但對流程相關的查詢功能很弱,如果需要增強,需要Elasticsearch的支持。部署和使用Elasticsearch帶來了額外的工作量。
- WorkflowCore支持使用JSON格式定義工作流,然而從功能上要弱於使用Fluent API定義的工作流,因為不具備解析Lambda表達式的能力
- 參數傳遞功能相對較弱,無法傳遞複雜對象。
上述問題是我們在實際中遇到的,希望對大家有所幫助。
本文來自博客園,作者:尋找無名的特質,轉載請註明原文鏈接:https://www.cnblogs.com/zhenl/p/16495977.html