1、IProducerConsumerCollection (線程安全介面) 此介面的所有實現必須都啟用此介面的所有成員,若要從多個線程同時使用。 using System; using System.Collections; using System.Collections.Concurrent; ...
1、IProducerConsumerCollection (線程安全介面)
此介面的所有實現必須都啟用此介面的所有成員,若要從多個線程同時使用。
using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; namespace ConsoleApp1 { public class SafeStack<T> : IProducerConsumerCollection<T> { // Used for enforcing thread-safety private object m_lockObject = new object(); // We'll use a regular old Stack for our core operations private Stack<T> m_sequentialStack = null; // // Constructors // public SafeStack() { m_sequentialStack = new Stack<T>(); } public SafeStack(IEnumerable<T> collection) { m_sequentialStack = new Stack<T>(collection); } // // Safe Push/Pop support // public void Push(T item) { lock (m_lockObject) m_sequentialStack.Push(item); } public bool TryPop(out T item) { bool rval = true; lock (m_lockObject) { if (m_sequentialStack.Count == 0) { item = default(T); rval = false; } else item = m_sequentialStack.Pop(); } return rval; } // // IProducerConsumerCollection(T) support // public bool TryTake(out T item) { return TryPop(out item); } public bool TryAdd(T item) { Push(item); return true; // Push doesn't fail } public T[] ToArray() { T[] rval = null; lock (m_lockObject) rval = m_sequentialStack.ToArray(); return rval; } public void CopyTo(T[] array, int index) { lock (m_lockObject) m_sequentialStack.CopyTo(array, index); } // // Support for IEnumerable(T) // public IEnumerator<T> GetEnumerator() { // The performance here will be unfortunate for large stacks, // but thread-safety is effectively implemented. Stack<T> stackCopy = null; lock (m_lockObject) stackCopy = new Stack<T>(m_sequentialStack); return stackCopy.GetEnumerator(); } // // Support for IEnumerable // IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable<T>)this).GetEnumerator(); } // // Support for ICollection // public bool IsSynchronized { get { return true; } } public object SyncRoot { get { return m_lockObject; } } public int Count { get { return m_sequentialStack.Count; } } public void CopyTo(Array array, int index) { lock (m_lockObject) ((ICollection)m_sequentialStack).CopyTo(array, index); } } }SafeStack
using System; using System.Collections.Concurrent; namespace ConsoleApp1 { class Program { static void Main() { TestSafeStack(); // Keep the console window open in debug mode. Console.WriteLine("Press any key to exit."); Console.ReadKey(); } // Test our implementation of IProducerConsumerCollection(T) // Demonstrates: // IPCC(T).TryAdd() // IPCC(T).TryTake() // IPCC(T).CopyTo() static void TestSafeStack() { SafeStack<int> stack = new SafeStack<int>(); IProducerConsumerCollection<int> ipcc = (IProducerConsumerCollection<int>)stack; // Test Push()/TryAdd() stack.Push(10); Console.WriteLine("Pushed 10"); ipcc.TryAdd(20); Console.WriteLine("IPCC.TryAdded 20"); stack.Push(15); Console.WriteLine("Pushed 15"); int[] testArray = new int[3]; // Try CopyTo() within boundaries try { ipcc.CopyTo(testArray, 0); Console.WriteLine("CopyTo() within boundaries worked, as expected"); } catch (Exception e) { Console.WriteLine("CopyTo() within boundaries unexpectedly threw an exception: {0}", e.Message); } // Try CopyTo() that overflows try { ipcc.CopyTo(testArray, 1); Console.WriteLine("CopyTo() with index overflow worked, and it SHOULD NOT HAVE"); } catch (Exception e) { Console.WriteLine("CopyTo() with index overflow threw an exception, as expected: {0}", e.Message); } // Test enumeration Console.Write("Enumeration (should be three items): "); foreach (int item in stack) Console.Write("{0} ", item); Console.WriteLine(""); // Test TryPop() int popped = 0; if (stack.TryPop(out popped)) { Console.WriteLine("Successfully popped {0}", popped); } else Console.WriteLine("FAILED to pop!!"); // Test Count Console.WriteLine("stack count is {0}, should be 2", stack.Count); // Test TryTake() if (ipcc.TryTake(out popped)) { Console.WriteLine("Successfully IPCC-TryTaked {0}", popped); } else Console.WriteLine("FAILED to IPCC.TryTake!!"); } } }Program
2、ConcurrentStack類:安全堆棧
using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); } static async Task RunProgram() { var taskStack = new ConcurrentStack<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskStack)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i - 1] = Task.Run( () => TaskProcessor(taskStack, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentStack<CustomTask> stack) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask { Id = i }; stack.Push(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } } static async Task TaskProcessor( ConcurrentStack<CustomTask> stack, string name, CancellationToken token) { await GetRandomDelay(); do { CustomTask workItem; bool popSuccesful = stack.TryPop(out workItem); if (popSuccesful) { Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } } }Program
3、ConcurrentQueue類:安全隊列
using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); } static async Task RunProgram() { var taskQueue = new ConcurrentQueue<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskQueue)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i - 1] = Task.Run( () => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask { Id = i }; queue.Enqueue(workItem); Console.WriteLine("插入Task {0} has been posted ThreadID={1}", workItem.Id, Thread.CurrentThread.ManagedThreadId); } } static async Task TaskProcessor( ConcurrentQueue<CustomTask> queue, string name, CancellationToken token) { CustomTask workItem; bool dequeueSuccesful = false; await GetRandomDelay(); do { dequeueSuccesful = queue.TryDequeue(out workItem); if (dequeueSuccesful) { Console.WriteLine("讀取Task {0} has been processed by {1} ThreadID={2}", workItem.Id, name, Thread.CurrentThread.ManagedThreadId); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } } }Program
4、ConcurrentDictionary類
ConcurrentDictionary類寫操作比使用鎖的通常字典(Dictionary)要慢的多,而讀操作則要快些。因此對字典要大量的線程安全的讀操作,ConcurrentDictionary類是最好的選擇
ConcurrentDictionary類的實現使用了細粒度鎖(fine-grained locking)技術,這在多線程寫入方面比使用鎖的通常的字典(也被稱為粗粒度鎖)
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; namespace ConsoleApp1 { class Program { static void Main(string[] args) { var concurrentDictionary = new ConcurrentDictionary<int, string>(); var dictionary = new Dictionary<int, string>(); var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { dictionary[i] = Item; } } sw.Stop(); Console.WriteLine("Writing to dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { concurrentDictionary[i] = Item; } sw.Stop(); Console.WriteLine("Writing to a concurrent dictionary: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { CurrentItem = dictionary[i]; } } sw.Stop(); Console.WriteLine("Reading from dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { CurrentItem = concurrentDictionary[i]; } sw.Stop(); Console.WriteLine("Reading from a concurrent dictionary: {0}", sw.Elapsed); } const string Item = "Dictionary item"; public static string CurrentItem; } }Program
5、ConcurrentBag類
namespace ConsoleApp1 { class CrawlingTask { public string UrlToCrawl { get; set; } public string ProducerName { get; set; } } }CrawlingTask
using System.Collections.Generic; namespace ConsoleApp1 { static class Module { public static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>(); public static void CreateLinks() { _contentEmulation["http://microsoft.com/"] = new[] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" }; _contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" }; _contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" }; _contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" }; _contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" }; _contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" }; _contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" }; _contentEmulation["http://facebook.com/"] = new[] { "http://facebook.com/a.html", "http://facebook.com/b.html" }; _contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" }; _contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" }; _contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" }; _contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" }; _contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" }; _contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" }; _contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" }; _contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" }; } } }Module
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { Module.CreateLinks(); Task t = RunProgram(); t.Wait(); } static async Task RunProgram() { var bag = new ConcurrentBag<CrawlingTask>(); string[] urls = new[] { "http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/" }; var crawlers = new Task[4]; for (int i = 1; i <= 4; i++) { string crawlerName = "Crawler " + i.ToString(); bag.Add(new CrawlingTask { UrlToCrawl = urls[i - 1], ProducerName = "root" }); crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName)); } await Task.WhenAll(crawlers); } static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName) { CrawlingTask task; //嘗試從bag中取出對象 while (bag.TryTake(out task)) { IEnumerable<string> urls = await GetLinksFromContent(task); if (urls != null) { foreach (var url in urls) { var t = new CrawlingTask { UrlToCrawl = url, ProducerName = crawlerName }; //將子集插入到bag中 bag.Add(t); } } Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!", task.UrlToCrawl, task.ProducerName, crawlerName); } } static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task) { await GetRandomDelay(); if (Module._contentEmulation.ContainsKey(task.UrlToCrawl)) return Module._contentEmulation[task.UrlToCrawl]; return null; } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(150, 200); return Task.Delay(delay); } } }Program
6、BlockingCollection類
BlockingCollection類: 我們能夠改變任務存儲在阻塞集合中的方式。預設情況下它使用的是ConcurrentQueue容器,但是我們能夠使用任何實現了IProducerConsumerCollection泛型介面的集合。
namespace ConsoleApp1 { class CustomTask { public int Id { get; set; } } }CustomTask
using System; using System.Threading.Tasks; namespace ConsoleApp1 { static class Module { public static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } } }Module
using System; using System.Collections.Concurrent; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { Console.WriteLine("Using a Queue inside of BlockingCollection"); Console.WriteLine(); Task t = RunProgram(); t.Wait(); //Console.WriteLine(); //Console.WriteLine("Using a Stack inside of BlockingCollection"); //Console.WriteLine(); //Task t = RunProgram(new ConcurrentStack<CustomTask>()); //t.Wait(); } static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null) { var taskCollection = new BlockingCollection<CustomTask>(); if (collection != null) taskCollection = new BlockingCollection<CustomTask>(collection); //初始化collection中的數據 var taskSource = Task.Run(() => TaskProducer(taskCollection)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = "Processor " + i; processors[i - 1] = Task.Run( () => TaskProcessor(taskCollection, processorId)); } await taskSource; await Task.WhenAll(processors); } /// <summary> /// 初始化collection中的數據 /// </summary> /// <param name="collection"></param> /// <returns></returns> static async Task TaskProducer(BlockingCollection<CustomTask> collection) { for (int i = 1; i <= 20; i++) { await Task.Delay(20); var workItem = new CustomTask { Id = i }; collection.Add(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } collection.CompleteAdding(); } /// <summary> /// 列印collection中的數據 /// </summary> /// <param name="collection"></param> /// <param name="name"></param> /// <returns></returns> static async Task TaskProcessor( BlockingCollection<CustomTask> collection, string name) { await Module.GetRandomDelay(); foreach (CustomTask item in collection.GetConsumingEnumerable()) { Console.WriteLine("Task {0} has been processed by {1}", item.Id, name); await Module.GetRandomDelay(); } } } }Program
7、使用ThreadStatic特性
ThreadStatic特性是最簡單的TLS使用,且只支持靜態欄位,只需要在欄位上標記這個特性就可以了
using System;