C# Parallel用法

来源:https://www.cnblogs.com/scmail81/archive/2018/08/22/9521096.html
-Advertisement-
Play Games

1、Parallel.Invoke 主要用於任務的並行 這個函數的功能和Task有些相似,就是併發執行一系列任務,然後等待所有完成。和Task比起來,省略了Task.WaitAll這一步,自然也缺少了Task的相關管理功能。它有兩種形式: Parallel.Invoke( params Action ...


1、Parallel.Invoke 主要用於任務的並行
  這個函數的功能和Task有些相似,就是併發執行一系列任務,然後等待所有完成。和Task比起來,省略了Task.WaitAll這一步,自然也缺少了Task的相關管理功能。它有兩種形式:
  Parallel.Invoke( params Action[] actions);
  Parallel.Invoke(Action[] actions,TaskManager manager,TaskCreationOptions options);

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var actions = new Action[]
            {
                () => ActionTest("test 1"),
                () => ActionTest("test 2"),
                () => ActionTest("test 3"),
                () => ActionTest("test 4")
            };

            Console.WriteLine("Parallel.Invoke 1 Test");
            Parallel.Invoke(actions);

            Console.WriteLine("結束!");
        }

        static void ActionTest(object value)
        {
            Console.WriteLine(">>> thread:{0}, value:{1}",
            Thread.CurrentThread.ManagedThreadId, value);
        }
    }
}
Program

2、For方法,主要用於處理針對數組元素的並行操作(數據的並行) 

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            Parallel.For(0, nums.Length, (i) =>
            {
                Console.WriteLine("針對數組索引{0}對應的那個元素{1}的一些工作代碼……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId);
            });
            Console.ReadKey();
        }
    }
}
Program

3、Foreach方法,主要用於處理泛型集合元素的並行操作(數據的並行)

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            Parallel.ForEach(nums, (item) =>
            {
                Console.WriteLine("針對集合元素{0}的一些工作代碼……ThreadId={1}", item, Thread.CurrentThread.ManagedThreadId);
            });
            Console.ReadKey();
        }
    }
}
Program

  數據的並行的方式二(AsParallel()):

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            var evenNumbers = nums.AsParallel().Select(item => Calculate(item));
            //註意這裡是個延遲載入,也就是不用集合的時候 這個Calculate裡面的演算法 是不會去運行 可以屏蔽下麵的代碼看效果;
            Console.WriteLine(evenNumbers.Count());
            //foreach (int item in evenNumbers)
            //    Console.WriteLine(item);
            Console.ReadKey();
        }

        static int Calculate(int number)
        {
            Console.WriteLine("針對集合元素{0}的一些工作代碼……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
            return number * 2;
        }
    }
}
Program

  .AsOrdered() 對結果進行排序:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp
{

    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            var evenNumbers = nums.AsParallel().AsOrdered().Select(item => Calculate(item));
            //註意這裡是個延遲載入,也就是不用集合的時候 這個Calculate裡面的演算法 是不會去運行 可以屏蔽下麵的代碼看效果;
            //Console.WriteLine(evenNumbers.Count());
            foreach (int item in evenNumbers)
                Console.WriteLine(item);
            Console.ReadKey();
        }

        static int Calculate(int number)
        {
            Console.WriteLine("針對集合元素{0}的一些工作代碼……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
            return number * 2;
        }
    }
}
Program

  ForEach的獨到之處就是可以將數據進行分區,每一個小區內實現串列計算,分區採用Partitioner.Create實現

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            for (int j = 1; j < 4; j++)
            {
                ConcurrentBag<int>  bag = new ConcurrentBag<int>();
                var watch = Stopwatch.StartNew();
                watch.Start();
                Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
                {
                    for (int m = i.Item1; m < i.Item2; m++)
                    {
                        bag.Add(m);
                    }
                });
                Console.WriteLine("並行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);
                GC.Collect();

            }
        }
    }
}
Program

  ParallelOptions類
  ParallelOptions options = new ParallelOptions();
  //指定使用的硬體線程數為4
  options.MaxDegreeOfParallelism = 4;
  有時候我們的線程可能會跑遍所有的內核,為了提高其他應用程式的穩定性,就要限制參與的內核,正好ParallelOptions提供了MaxDegreeOfParallelism屬性。

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class Student
    {
        public int ID { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
        public DateTime CreateTime { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var dic = LoadData();
            Stopwatch watch = new Stopwatch();
            watch.Start();
            var query2 = (from n in dic.Values.AsParallel()
                          where n.Age > 20 && n.Age < 25
                          select n).ToList();
            watch.Stop();
            Console.WriteLine("並行計算耗費時間:{0}", watch.ElapsedMilliseconds);

            Console.Read();
        }

        public static ConcurrentDictionary<int, Student> LoadData()
        {
            ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
            ParallelOptions options = new ParallelOptions();
            //指定使用的硬體線程數為4
            options.MaxDegreeOfParallelism = 4;
            //預載入1500w條記錄
            Parallel.For(0, 15000000, options, (i) =>
            {
                var single = new Student()
                {
                    ID = i,
                    Name = "hxc" + i,
                    Age = i % 151,
                    CreateTime = DateTime.Now.AddSeconds(i)
                };
                dic.TryAdd(i, single);
            });

            return dic;
        }
    }
}
Program

常見問題的處理

  <1> 如何中途退出並行迴圈?
  是的,在串列代碼中我們break一下就搞定了,但是並行就不是這麼簡單了,不過沒關係,在並行迴圈的委托參數中提供了一個ParallelLoopState,該實例提供了Break和Stop方法來幫我們實現。
  Break: 當然這個是通知並行計算儘快的退出迴圈,比如並行計算正在迭代100,那麼break後程式還會迭代所有小於100的。
  Stop:這個就不一樣了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            ConcurrentBag<int> bag = new ConcurrentBag<int>();

            Parallel.For(0, 20000000, (i, state) =>
            {
                if (bag.Count == 1000)
                {
                    //state.Break();
                    state.Stop();
                    return;
                }
                bag.Add(i);
            });

            Console.WriteLine("當前集合有{0}個元素。", bag.Count);

        }
    }
}
Program

  取消(cancel)

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        public static void Main()
        {

            var cts = new CancellationTokenSource();
            var ct = cts.Token;
            Task.Factory.StartNew(() => fun(ct));
            Console.ReadKey();
            //Thread.Sleep(3000);
            cts.Cancel();
            Console.WriteLine("任務取消了!");

        }

        static void fun(CancellationToken token)
        {
            Parallel.For(0, 100000,
                        new ParallelOptions { CancellationToken = token },
                        (i) =>
                        {
                            Console.WriteLine("針對數組索引{0}的一些工作代碼……ThreadId={1}", i, Thread.CurrentThread.ManagedThreadId);
                        });
        }
    }
}
Program

  <2> 並行計算中拋出異常怎麼處理?
  首先任務是並行計算的,處理過程中可能會產生n多的異常,那麼如何來獲取到這些異常呢?普通的Exception並不能獲取到異常,然而為並行誕生的AggregateExcepation就可以獲取到一組異常。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                Parallel.Invoke(Run1, Run2);
            }
            catch (AggregateException ex)
            {
                foreach (var single in ex.InnerExceptions)
                {
                    Console.WriteLine(single.Message);
                }
            }
            Console.WriteLine("結束了!");
            //Console.Read();
        }

        static void Run1()
        {
            Thread.Sleep(3000);
            throw new Exception("我是任務1拋出的異常");
        }

        static void Run2()
        {
            Thread.Sleep(5000);
            throw new Exception("我是任務2拋出的異常");
        }
    }
}
Program

  註意Parallel裡面 不建議拋出異常 因為在極端的情況下比如進去的第一批線程先都拋異常了 此時AggregateExcepation就只能捕獲到這一批的錯誤,然後程式就結束了

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class TestClass
    {
        public static List<int> NumberList = null;
        private static readonly object locker = new object();
        public void Test(int Number)
        {
            throw new Exception("1111");
            //lock (locker)
            //{
            //    if (NumberList == null)
            //    {
            //        Console.WriteLine("執行添加");
            //        NumberList = new List<int>();
            //        NumberList.Add(1);
            //        //Thread.Sleep(1000);
            //    }
            //}
            //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
            //Console.WriteLine(Number);
        }
    }

    class Program
    {
        private static readonly object locker = new object();
        static void Main(string[] args)
        {
            List<string> errList = new List<string>();
            try
            {
                Parallel.For(0, 10, (i) =>
                {
                    try
                    {
                        TestClass a = new TestClass();
                        a.Test(i);
                    }
                    catch (Exception ex)
                    {
                        lock (locker)
                        {
                            errList.Add(ex.Message);
                            throw ex;
                        }
                    }
                });
            }
            catch (AggregateException ex)
            {
                foreach (var single in ex.InnerExceptions)
                {
                    Console.WriteLine(single.Message);
                }
            }
            int Index = 1;
            foreach (string err in errList)
            {
                Console.WriteLine("{0}、的錯誤:{1}", Index++, err);
            }
        }
    }
}
Program

  可以向下麵這樣來處理一下
  不在AggregateExcepation中來處理 而是在Parallel裡面的try catch來記錄錯誤,或處理錯誤

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class TestClass
    {
        public static List<int> NumberList = null;
        private static readonly object locker = new object();
        public void Test(int Number)
        {
            throw new Exception("1111");
            //lock (locker)
            //{
            //    if (NumberList == null)
            //    {
            //        Console.WriteLine("執行添加");
            //        NumberList = new List<int>();
            //        NumberList.Add(1);
            //        //Thread.Sleep(1000);
            //    }
            //}
            //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
            //Console.WriteLine(Number);
        }
    }

    class Program
    {
        private static readonly object locker = new object();
        static void Main(string[] args)
        {
            List<string> errList = new List<string>();
            Parallel.For(0, 10, (i) =>
            {
                try
                {
                    TestClass a = new TestClass();
                    a.Test(i);
                }
                catch (Exception ex)
                {
                    lock (locker)
                    {
                        errList.Add(ex.Message);
                    }
                    //Console.WriteLine(ex.Message);
                    //註:這裡不再將錯誤拋出.....
                    //throw ex;
                }
            });

            int Index = 1;
            foreach (string err in errList)
            {
                Console.WriteLine("{0}、的錯誤:{1}", Index++, err);
            }
        }
    }
}
Program

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 用到的: import uuid uuid是128位的全局唯一標識符, 通常用32位的一個字元串的形式來表現 uuid.uuid1() 基於MAC地址,時間戳,隨機數來生成唯一的uuid,可以保證全球範圍內的唯一性 uuid.uuid3() 通過計算一個命名空間和名字的md5散列值來給出一個uuid ...
  • 前言 在 "上一篇" 中我們學習了創建型模式的建造者模式和原型模式。本篇則來學習下結構型模式的適配器模式和橋接模式。 適配器模式 簡介 適配器模式是作為兩個不相容的介面之間的橋梁。這種類型的設計模式屬於結構型模式,它結合了兩個獨立介面的功能。 簡單的來說就是通過某個介面將不相容的兩個類進行相容,俗稱 ...
  • jsp 內置對象 轉發與重定向的比較 重定向和轉發有一個重要的不同:當使用轉發時,JSP容器將使用一個內部的方法來調用目標頁面,新的頁面繼續處理同一個請求,而瀏覽器將不會知道這個過程。 與之相反,重定向方式的含義是第一個頁面通知瀏覽器發送一個新的頁面請求。因為,當你使用重定向時,瀏覽器中所顯示的UR ...
  • 所有文章僅是筆記!都是看網上大神或者視頻學習來的。 一、軟體層面機器碼翻譯(為了吹的牛B:write once run everywhere) 二、記憶體管理:java經久不衰的原因之一 ...
  • Python的錯誤異常在大部分IDE編輯器中則可以直接顯示出來,便於開發人員的調試及修改工作,對初學者也比較友好。 Python中包含錯誤和異常兩種情況,錯誤主要是常見的語法錯誤SyntaxError,並且在錯誤提示中會有倒三角箭頭的修改指示位置;python中的另外一種錯誤提醒叫做異常,指的是在語 ...
  • 好,大家好,我是Simon。接下來的時間由我和大家一起學習VC編程。 那麼我們現在的話就是開始去進入實際的一個程式設計階段。 那麼這個的話是說編寫一個全屏截圖工具。那麼這個工具的話就是我們採用一個cimage的一個類。 然後對它進行一個截圖操作。那麼採用這個類的話,主要是方便,然後再說快捷。在這... ...
  • 值類型、引用類型和泛型 多語言 咱們先不說主題,先說說CLR支持多語言。 .net有個非常強大的特點,那就是跨語言,支持很多語言,比如C#、J#等。先來個圖看一看 看到這個圖,每個語言都有自己的編譯器,通過第一次編譯,編譯成中間文件(dll或是exe文件)。在程式運行的時候,再次編譯把中間文件編譯成 ...
  • asp.net core 2.1 部署IIS(win10) 概述 與ASP.NET時代不同,ASP.NET Core不再是由IIS工作進程(w3wp.exe)托管,而是使用自托管Web伺服器(Kestrel)運行,IIS則是作為反向代理的角色轉發請求到Kestrel不同埠的ASP.NET Core ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...