1、IProducerConsumerCollection (線程安全介面) 此介面的所有實現必須都啟用此介面的所有成員,若要從多個線程同時使用。 using System; using System.Collections; using System.Collections.Concurrent; ...

1、IProducerConsumerCollection (線程安全介面)

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace ConsoleApp1
    public class SafeStack<T> : IProducerConsumerCollection<T>
        // Used for enforcing thread-safety
        private object m_lockObject = new object();

        // We'll use a regular old Stack for our core operations
        private Stack<T> m_sequentialStack = null;

        // Constructors
        public SafeStack()
            m_sequentialStack = new Stack<T>();

        public SafeStack(IEnumerable<T> collection)
            m_sequentialStack = new Stack<T>(collection);

        // Safe Push/Pop support
        public void Push(T item)
            lock (m_lockObject) m_sequentialStack.Push(item);

        public bool TryPop(out T item)
            bool rval = true;
            lock (m_lockObject)
                if (m_sequentialStack.Count == 0) { item = default(T); rval = false; }
                else item = m_sequentialStack.Pop();
            return rval;

        // IProducerConsumerCollection(T) support
        public bool TryTake(out T item)
            return TryPop(out item);

        public bool TryAdd(T item)
            return true; // Push doesn't fail

        public T[] ToArray()
            T[] rval = null;
            lock (m_lockObject) rval = m_sequentialStack.ToArray();
            return rval;

        public void CopyTo(T[] array, int index)
            lock (m_lockObject) m_sequentialStack.CopyTo(array, index);

        // Support for IEnumerable(T)
        public IEnumerator<T> GetEnumerator()
            // The performance here will be unfortunate for large stacks,
            // but thread-safety is effectively implemented.
            Stack<T> stackCopy = null;
            lock (m_lockObject) stackCopy = new Stack<T>(m_sequentialStack);
            return stackCopy.GetEnumerator();

        // Support for IEnumerable
        IEnumerator IEnumerable.GetEnumerator()
            return ((IEnumerable<T>)this).GetEnumerator();

        // Support for ICollection
        public bool IsSynchronized
            get { return true; }

        public object SyncRoot
            get { return m_lockObject; }

        public int Count
            get { return m_sequentialStack.Count; }

        public void CopyTo(Array array, int index)
            lock (m_lockObject) ((ICollection)m_sequentialStack).CopyTo(array, index);
using System;
using System.Collections.Concurrent;

namespace ConsoleApp1
    class Program
        static void Main()

            // Keep the console window open in debug mode.
            Console.WriteLine("Press any key to exit.");

        // Test our implementation of IProducerConsumerCollection(T)
        // Demonstrates:
        //      IPCC(T).TryAdd()
        //      IPCC(T).TryTake()
        //      IPCC(T).CopyTo()
        static void TestSafeStack()
            SafeStack<int> stack = new SafeStack<int>();
            IProducerConsumerCollection<int> ipcc = (IProducerConsumerCollection<int>)stack;

            // Test Push()/TryAdd()
            stack.Push(10); Console.WriteLine("Pushed 10");
            ipcc.TryAdd(20); Console.WriteLine("IPCC.TryAdded 20");
            stack.Push(15); Console.WriteLine("Pushed 15");

            int[] testArray = new int[3];

            // Try CopyTo() within boundaries
                ipcc.CopyTo(testArray, 0);
                Console.WriteLine("CopyTo() within boundaries worked, as expected");
            catch (Exception e)
                Console.WriteLine("CopyTo() within boundaries unexpectedly threw an exception: {0}", e.Message);

            // Try CopyTo() that overflows
                ipcc.CopyTo(testArray, 1);
                Console.WriteLine("CopyTo() with index overflow worked, and it SHOULD NOT HAVE");
            catch (Exception e)
                Console.WriteLine("CopyTo() with index overflow threw an exception, as expected: {0}", e.Message);

            // Test enumeration
            Console.Write("Enumeration (should be three items): ");
            foreach (int item in stack)
                Console.Write("{0} ", item);

            // Test TryPop()
            int popped = 0;
            if (stack.TryPop(out popped))
                Console.WriteLine("Successfully popped {0}", popped);
            else Console.WriteLine("FAILED to pop!!");

            // Test Count
            Console.WriteLine("stack count is {0}, should be 2", stack.Count);

            // Test TryTake()
            if (ipcc.TryTake(out popped))
                Console.WriteLine("Successfully IPCC-TryTaked {0}", popped);
            else Console.WriteLine("FAILED to IPCC.TryTake!!");


using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
    class Program
        static void Main(string[] args)
            Task t = RunProgram();

        static async Task RunProgram()
            var taskStack = new ConcurrentStack<CustomTask>();
            var cts = new CancellationTokenSource();

            var taskSource = Task.Run(() => TaskProducer(taskStack));

            Task[] processors = new Task[4];
            for (int i = 1; i <= 4; i++)
                string processorId = i.ToString();
                processors[i - 1] = Task.Run(
                    () => TaskProcessor(taskStack, "Processor " + processorId, cts.Token));

            await taskSource;

            await Task.WhenAll(processors);

        static async Task TaskProducer(ConcurrentStack<CustomTask> stack)
            for (int i = 1; i <= 20; i++)
                await Task.Delay(50);
                var workItem = new CustomTask { Id = i };
                Console.WriteLine("Task {0} has been posted", workItem.Id);

        static async Task TaskProcessor(
            ConcurrentStack<CustomTask> stack, string name, CancellationToken token)
            await GetRandomDelay();
                CustomTask workItem;
                bool popSuccesful = stack.TryPop(out workItem);
                if (popSuccesful)
                    Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name);

                await GetRandomDelay();
            while (!token.IsCancellationRequested);

        static Task GetRandomDelay()
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);

        class CustomTask
            public int Id { get; set; }


using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
    class Program
        static void Main(string[] args)
            Task t = RunProgram();

        static async Task RunProgram()
            var taskQueue = new ConcurrentQueue<CustomTask>();
            var cts = new CancellationTokenSource();

            var taskSource = Task.Run(() => TaskProducer(taskQueue));

            Task[] processors = new Task[4];
            for (int i = 1; i <= 4; i++)
                string processorId = i.ToString();
                processors[i - 1] = Task.Run(
                    () => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token));

            await taskSource;

            await Task.WhenAll(processors);

        static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
            for (int i = 1; i <= 20; i++)
                await Task.Delay(50);
                var workItem = new CustomTask { Id = i };
                Console.WriteLine("插入Task {0} has been posted ThreadID={1}", workItem.Id, Thread.CurrentThread.ManagedThreadId);

        static async Task TaskProcessor(
            ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)
            CustomTask workItem;
            bool dequeueSuccesful = false;

            await GetRandomDelay();
                dequeueSuccesful = queue.TryDequeue(out workItem);
                if (dequeueSuccesful)
                    Console.WriteLine("讀取Task {0} has been processed by {1} ThreadID={2}",
                                        workItem.Id, name, Thread.CurrentThread.ManagedThreadId);

                await GetRandomDelay();
            while (!token.IsCancellationRequested);

        static Task GetRandomDelay()
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);

        class CustomTask
            public int Id { get; set; }

  ConcurrentDictionary類的實現使用了細粒度鎖(fine-grained locking)技術,這在多線程寫入方面比使用鎖的通常的字典(也被稱為粗粒度鎖)

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;

namespace ConsoleApp1
    class Program
        static void Main(string[] args)
            var concurrentDictionary = new ConcurrentDictionary<int, string>();
            var dictionary = new Dictionary<int, string>();

            var sw = new Stopwatch();

            for (int i = 0; i < 1000000; i++)
                lock (dictionary)
                    dictionary[i] = Item;
            Console.WriteLine("Writing to dictionary with a lock: {0}", sw.Elapsed);

            for (int i = 0; i < 1000000; i++)
                concurrentDictionary[i] = Item;
            Console.WriteLine("Writing to a concurrent dictionary: {0}", sw.Elapsed);

            for (int i = 0; i < 1000000; i++)
                lock (dictionary)
                    CurrentItem = dictionary[i];
            Console.WriteLine("Reading from dictionary with a lock: {0}", sw.Elapsed);

            for (int i = 0; i < 1000000; i++)
                CurrentItem = concurrentDictionary[i];
            Console.WriteLine("Reading from a concurrent dictionary: {0}", sw.Elapsed);

        const string Item = "Dictionary item";
        public static string CurrentItem;


namespace ConsoleApp1
    class CrawlingTask
        public string UrlToCrawl { get; set; }

        public string ProducerName { get; set; }
using System.Collections.Generic;

namespace ConsoleApp1
    static class Module
        public static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>();

        public static void CreateLinks()
            _contentEmulation["http://microsoft.com/"] = new[] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" };
            _contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" };
            _contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" };

            _contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" };
            _contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" };
            _contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" };
            _contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" };

            _contentEmulation["http://facebook.com/"] = new[] { "http://facebook.com/a.html", "http://facebook.com/b.html" };
            _contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" };
            _contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" };

            _contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" };
            _contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" };
            _contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" };
            _contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" };
            _contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" };
            _contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" };
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
    class Program
        static void Main(string[] args)
            Task t = RunProgram();

        static async Task RunProgram()
            var bag = new ConcurrentBag<CrawlingTask>();

            string[] urls = new[] { "http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/" };

            var crawlers = new Task[4];
            for (int i = 1; i <= 4; i++)
                string crawlerName = "Crawler " + i.ToString();
                bag.Add(new CrawlingTask { UrlToCrawl = urls[i - 1], ProducerName = "root" });
                crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName));

            await Task.WhenAll(crawlers);

        static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName)
            CrawlingTask task;
            while (bag.TryTake(out task))
                IEnumerable<string> urls = await GetLinksFromContent(task);
                if (urls != null)
                    foreach (var url in urls)
                        var t = new CrawlingTask
                            UrlToCrawl = url,
                            ProducerName = crawlerName
                Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!",
                    task.UrlToCrawl, task.ProducerName, crawlerName);

        static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task)
            await GetRandomDelay();

            if (Module._contentEmulation.ContainsKey(task.UrlToCrawl)) return Module._contentEmulation[task.UrlToCrawl];

            return null;

        static Task GetRandomDelay()
            int delay = new Random(DateTime.Now.Millisecond).Next(150, 200);
            return Task.Delay(delay);


  BlockingCollection類: 我們能夠改變任務存儲在阻塞集合中的方式。預設情況下它使用的是ConcurrentQueue容器,但是我們能夠使用任何實現了IProducerConsumerCollection泛型介面的集合。

namespace ConsoleApp1
    class CustomTask
        public int Id { get; set; }
using System;
using System.Threading.Tasks;

namespace ConsoleApp1
    static class Module
        public static Task GetRandomDelay()
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace ConsoleApp1
    class Program
        static void Main(string[] args)
            Console.WriteLine("Using a Queue inside of BlockingCollection");
            Task t = RunProgram();

            //Console.WriteLine("Using a Stack inside of BlockingCollection");
            //Task t = RunProgram(new ConcurrentStack<CustomTask>());

        static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null)
            var taskCollection = new BlockingCollection<CustomTask>();
            if (collection != null)
                taskCollection = new BlockingCollection<CustomTask>(collection);
            var taskSource = Task.Run(() => TaskProducer(taskCollection));

            Task[] processors = new Task[4];
            for (int i = 1; i <= 4; i++)
                string processorId = "Processor " + i;
                processors[i - 1] = Task.Run(
                    () => TaskProcessor(taskCollection, processorId));

            await taskSource;

            await Task.WhenAll(processors);
        /// <summary>
        /// 初始化collection中的數據
        /// </summary>
        /// <param name="collection"></param>
        /// <returns></returns>
        static async Task TaskProducer(BlockingCollection<CustomTask> collection)
            for (int i = 1; i <= 20; i++)
                await Task.Delay(20);
                var workItem = new CustomTask { Id = i };
                Console.WriteLine("Task {0} has been posted", workItem.Id);
        /// <summary>
        /// 列印collection中的數據
        /// </summary>
        /// <param name="collection"></param>
        /// <param name="name"></param>
        /// <returns></returns>
        static async Task TaskProcessor(
            BlockingCollection<CustomTask> collection, string name)
            await Module.GetRandomDelay();
            foreach (CustomTask item in collection.GetConsumingEnumerable())
                Console.WriteLine("Task {0} has been processed by {1}", item.Id, name);
                await Module.GetRandomDelay();


using System;


