一.並行LINQ System.Linq名稱空間中包含的類ParallelEnumerable可以分解查詢的工作,使其分佈在多個線程上。 儘管Enumerable類給IEnumerable<T>介面定義了擴展方法,但ParallelEnumerable類的大多數擴展方法是ParallerQuery< ...
一.並行LINQ
System.Linq名稱空間中包含的類ParallelEnumerable可以分解查詢的工作,使其分佈在多個線程上。
儘管Enumerable類給IEnumerable<T>介面定義了擴展方法,但ParallelEnumerable類的大多數擴展方法是ParallerQuery<TSource>類的擴展。例如,AsParallel()方法,它擴展了IEnumerable<T>介面,返回ParallelQuery<T>類,所以正常的集合類可以以平行方式查詢。
1.並行查詢
下麵演示並行LINQ(Parallel LINQ,PLINQ):
//用隨機值填充一個大型的int集合 // Enumerable.Range(0, arraySize),生成指定範圍內的整數的空序列。 //Select(x => r.Next(140)),用小於140的數填充集合 static IEnumerable<int> SampleData() { const int arraySize = 100000000; var r = new Random(); return Enumerable.Range(0, arraySize).Select(x => r.Next(140)).ToList(); } static void IntroParallel() { var data = SampleData(); var watch = new Stopwatch(); //非並行LINQ watch.Start(); var q1 = (from x in data where Math.Log(x) < 4 select x).Average(); watch.Stop(); Console.WriteLine("sync {0}, result: {1}", watch.ElapsedMilliseconds, q1); watch.Reset(); //使用data.AsParallel()進行並行LINQ watch.Start(); var q2 = (from x in data.AsParallel() where Math.Log(x) < 4 select x).Average(); watch.Stop(); Console.WriteLine("async {0}, result: {1}", watch.ElapsedMilliseconds, q2); }
輸出;
發現並行查詢時間用的少,在並行查詢時CPU利用率達到100%
與LINQ基礎(二)(http://www.cnblogs.com/afei-24/p/6845551.html)中的LINQ查詢一樣,編譯器會修改語法,以調用AsParallel,Where(),Select(),Average()方法:
var q2 = data.AsParallel().Where(x => Math.Log(x)<4).Select(x => x).Average();
AsParallel()方法用ParallerEnumerable類定義,以擴展IEnumerable<T>介面,所以可以對簡單的數組調用它。AsParallel()方法返回ParallerQuery<T>。因為返回的類型,所以編譯器選擇的Where()方法是ParallerEnumerable.Where(),而不是Enumerable.Where()。
對於PrarllelEnumerable類,查詢是分區的,以便多個線程可以同時處理該查詢。集合可以分為多個部分,其中每個部分由不同的線程處理。完成分區的工作後,就需要合併,獲得所有部分的總和。
2.分區器
AsParallel()方法不僅擴展了IEnumerable<T>介面,還擴展了Partitioner類。通過它可以影響要創建的分區。
Partitioner類用System,Collection.Concurrent名稱空間定義,並且有不同的變體。Create()方法接受實現了IList<T>類的數組或對象,以及Boolean類型的參數,返回一個不同的Partitioner類型。Create()方法有多個重載版本。
var q2 = (from x in Partitioner.Create(data).AsParallel()
where Math.Log(x) < 4
select x).Average();
也可以對AsParallel()方法接著調用WithExecutionMode()和WithDegreeOfParallelism()方法,來影響並行機制。WithExecutionMode()方法可以傳遞ParallelExecutionMode的一個Default值或者ForceParallelism值。預設情況下,並行LINQ避免使用系統開銷很高的並行機制。WithDegreeOfParallelism()方法,可以傳遞一個整數值,以指定應並行運行的最大任務數。如果查詢不應使用全部CPU,這個方法很有用。
3.取消
要取消長時間運行的查詢,可以給查詢添加WithCancellation()方法,並傳遞一個CancellationToken令牌作為參數。CancellationToken令牌從CancellationTokenSource類中創建。
舉個例子,下麵的查詢在單獨的線程中運行,如果取消了查詢,在該線程中捕獲一個OperationCanceledException類型的異常。在主線程中,可以調用CancellationTokenSource類的Cancle()方法取消任務。
var data = SampleData(); var watch = new Stopwatch(); watch.Start(); Console.WriteLine("filled array"); var sum1 = (from x in data where Math.Log(x) < 4 select x).Average(); Console.WriteLine("sync result {0}", sum1); var cts = new CancellationTokenSource(); Task.Factory.StartNew(() => { try { var res = (from x in data.AsParallel().WithCancellation(cts.Token) where Math.Log(x) < 4 select x).Average(); Console.WriteLine("query finished, result: {0}", res); } catch (OperationCanceledException ex) { Console.WriteLine(ex.Message); } }); watch.Stop(); Console.WriteLine("async {0}, result: {1}", watch.ElapsedMilliseconds, "res"); Console.WriteLine("query started"); Console.Write("cancel? "); string input = Console.ReadLine(); if (input.ToLower().Equals("y")) { cts.Cancel(); Console.WriteLine("sent a cancel"); } Console.WriteLine("press return to exit"); Console.ReadLine();
二.表達式樹
在LINQ To Object 中,擴展方法需要將一個委托類型作為參數,這樣就可以將lambda表達式賦予參數。lambda表達式也可以賦予Expression<T>類型的參數,C#編譯器根據類型給lambda表達式定義不同的行為。如果類型是Expression<T>,編譯器就從lambda表達式中創建一個表達式樹,並存儲在程式集中。這樣就可以在運行期間分析表達式樹,併進行優化,以便查詢數據源。
var racers = from r in Formula1.GetChampions()
where r.Wins > 15 && (r.Country == "Brazil" || r.Country == "Austria")
select r;
這個查詢表達式使用了擴展方法Where(),Select()方法。Enumerable類定義了Where()方法,並將委托類型Func<T,bool>作為參數謂詞:
public static IEnumerable<TSource> Where<TSource>(this IEnumerable<TSource> source,Func<TSource,bool> predicate);
這樣,就可以把lambda表達式賦予委托predicate。
除了使用委托之外,編譯器還會把表達式樹放在程式集中。表達式樹可以在運行期間讀取。表達式樹從派生自抽象基類Expression的類中構建。Expression和Expression<T>不同。繼承自Expression類的表達式類有BinaryExpression,ConstantExpression,InvocationExpression等。編譯器會從lambda表達式中創建表達式樹。
例如,lambda表達式r.Country == "Brazil"使用了ParameterExpression,MemberExpression,ConstantExpression,MethodCallExpression,來創建一個表達式樹,並將該樹存儲在程式集中,之後在運行期間使用這個樹,創建一個用於底層數據源的優化查詢:
//DisplayTree方法在控制臺上圖形化的顯示表達式樹。其中傳遞一個Expression對象,並根據表達式的類型,把表達式的一些信息寫到控制臺上 private static void DisplayTree(int indent, string message, Expression expression) { string output = String.Format("{0} {1} ! NodeType: {2}; Expr: {3} ", "".PadLeft(indent, '>'), message, expression.NodeType, expression); indent++; switch (expression.NodeType) { case ExpressionType.Lambda: Console.WriteLine(output); LambdaExpression lambdaExpr = (LambdaExpression)expression; foreach (var parameter in lambdaExpr.Parameters) { DisplayTree(indent, "Parameter", parameter); } DisplayTree(indent, "Body", lambdaExpr.Body); break; case ExpressionType.Constant: ConstantExpression constExpr = (ConstantExpression)expression; Console.WriteLine("{0} Const Value: {1}", output, constExpr.Value); break; case ExpressionType.Parameter: ParameterExpression paramExpr = (ParameterExpression)expression; Console.WriteLine("{0} Param Type: {1}", output, paramExpr.Type.Name); break; case ExpressionType.Equal: case ExpressionType.AndAlso: case ExpressionType.GreaterThan: BinaryExpression binExpr = (BinaryExpression)expression; if (binExpr.Method != null) { Console.WriteLine("{0} Method: {1}", output, binExpr.Method.Name); } else { Console.WriteLine(output); } DisplayTree(indent, "Left", binExpr.Left); DisplayTree(indent, "Right", binExpr.Right); break; case ExpressionType.MemberAccess: MemberExpression memberExpr = (MemberExpression)expression; Console.WriteLine("{0} Member Name: {1}, Type: {2}", output, memberExpr.Member.Name, memberExpr.Type.Name); DisplayTree(indent, "Member Expr", memberExpr.Expression); break; default: Console.WriteLine(); Console.WriteLine("{0} {1}", expression.NodeType, expression.Type.Name); break; } } static void Main() { Expression<Func<Racer, bool>> expression = r => r.Country == "Brazil" && r.Wins > 6; DisplayTree(0, "Lambda", expression); }
輸出:
使用Expression<T>類型的一個例子是ADO.NET EF 和WCF數據服務的客戶端提供程式。這些技術用Expression<T>參數定義了擴展方法。這樣,訪問資料庫的LINQ提供程式就可以讀取表達式,創建一個運行期間優化的查詢,從資料庫中獲取數據。
後面會單獨介紹表達式樹的使用。
三.LINQ提供程式
.NET包含幾個LINQ提供程式。LINQ提供程式為特定的數據源實現了標準的查詢操作符。LINQ提供程式也許會實現比LINQ定義的更多擴展方法,但至少要實現標準操作符。LINQ To XML實現了一些專門用於XML的方法,後面會詳細介紹。
LINQ提供程式的實現方案是根據名稱空間和第一個參數的類型來選擇的。實現擴展方法的類的名稱空間必須是開放的,否則擴展方法就不在作用域內。在LINQ to Objects中定義的Where()方法的參數和LINQ To Entities中定義的Where()方法的參數不同:
LINQ to Objects中定義的Where()方法:
public static IEnumerable<TSource> Where<TSource>(this IEnumerable<TSource> source,Func<TSource,bool> predicate);
LINQ To Entities中定義的Where()方法:
public static IQueryable<TSource> Where<TSource>(this IQueryable<TSource> source,Expression<Func<TSource,bool>> predicate);
這兩個類都在System.Linq的Syste,.Core程式集中實現。無論是用 Func<TSource,bool>傳遞參數,還是用Expression<Func<TSource,bool>>參數傳遞,lambda表達式都相同。只是編譯器的行為不同,它根據source參數來選擇。編譯器根據其參數選擇最匹配的方法。在ADO.NET EF中定義的ObjectContext類CreateQuery<T>()方法返回一個實現了IQueryable<TSource>介面的ObjectQuery<T>對象,因此EF使用Querable類的Where()方法。