Kolejka producer-consumer działająca na wielu rdzeniach

Deti

1 Wstęp
2 Implementacja
3 Przykład użycia

Wstęp

W tym gotowcu znajdziecie rozbudowany przykład z książki "C# 4.0 in a Nutshell", który opisuje kolejkę typu producer-consumer. Istnieją już różne implementację tej funkcjonalności w C#, ale dzięki klasom z .NET 4.0 możliwe jest, aby proces kolejnych elementów w kolejce odbywał się na wielu rdzeniach (jako, że jednym z kluczowych elementów C#/.NET 4.0 jest parallel programming). Przykład rozbudowałem o możliwości Pause/Resume oraz możliwość podania kolejce czegoś więcej niż tylko delegata typu Action();

Implementacja

Kod nie jest długi i mieści się w jednej klasie:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;

namespace HAKGERSoft {

    public sealed class PPCQueue: IDisposable {
       
        abstract class QueueWorkItem {
            public readonly TaskCompletionSource<object> TaskSource;
            public readonly CancellationToken? CancelToken;
            public abstract void Process();
            public QueueWorkItem(CancellationToken? cancelToken){
                TaskSource=new TaskCompletionSource<object>();
                CancelToken = cancelToken;
            }
        }

        class SimpleWorkItem: QueueWorkItem {
            readonly Action ActionDel;
            public SimpleWorkItem(CancellationToken? cancelToken,Action action)
                :base(cancelToken){
                ActionDel=action;
            }
            public override void Process() {
                ActionDel();
            }
        }

        class DataWorkItem<T>: QueueWorkItem{
            readonly Action<T> ActionDel;
            readonly T Data;
            public DataWorkItem(CancellationToken? cancelToken,Action<T> action,T data)
                :base(cancelToken){
                ActionDel=action;
                Data=data;
            }
            public override void Process() {
                ActionDel(Data);
            }
        }

        bool Disposed;
        readonly BlockingCollection<QueueWorkItem> Queue;
        readonly ManualResetEvent QueueWaitHandle;
        readonly Task[] Tasks;

        public PPCQueue(int workerCount){
            if(workerCount<0)
                throw new ArgumentException("workerCount must be greater or equal 0");
            Queue=new BlockingCollection<QueueWorkItem>();
            QueueWaitHandle=new ManualResetEvent(false);
            Tasks=GetTasks(workerCount).ToArray();
            QueueWaitHandle.Set();
        }

        IEnumerable<Task> GetTasks(int workerCount){
            while(workerCount-->0)
                yield return Task.Factory.StartNew(Consume);
        }

        public Task Enqueue(Action action) {
            return Enqueue(action,null);
        }

        public Task Enqueue(Action action,CancellationToken? cancelToken) {
            return Enqueue(new SimpleWorkItem(cancelToken,action));
        }

        public Task Enqueue<T>(Action<T> action,T data) {
            return Enqueue<T>(action,data,null);
        }

        public Task Enqueue<T>(Action<T> action,T data, CancellationToken? cancelToken) {
            return Enqueue(new DataWorkItem<T>(cancelToken,action,data));
        }

        Task Enqueue(QueueWorkItem queueWorkItem){
            Queue.Add(queueWorkItem);
            return queueWorkItem.TaskSource.Task;
        }

        public void Pause(){
            QueueWaitHandle.Reset();
        }
        
        public void Resume(){
            QueueWaitHandle.Set();
        }

        public void Join(){
            Queue.CompleteAdding();
            QueueWaitHandle.Set();
            Task.WaitAll(this.Tasks);
        }

        void Consume(){
            foreach(QueueWorkItem workItem in this.Queue.GetConsumingEnumerable()){
                QueueWaitHandle.WaitOne();
                if(workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) {
                    workItem.TaskSource.SetCanceled();
                }
                else{
                    try {
                        workItem.Process();
                        workItem.TaskSource.SetResult(null);
                    }
                    catch(Exception ex) {
                        workItem.TaskSource.SetException(ex);
                    }
                }
            }
        }

        public void Dispose() {
            if(!this.Disposed&&!Queue.IsCompleted) {
                Queue.CompleteAdding();
                Disposed=true;
            }
        }
    }
}

Przykład użycia

W paczce z całością (patrz dół artykułu) znajdziecie przykładową aplikację oraz testy jednostkowe, które pomogą w obsłudze kolejki (daruję sobie wymienianie przykładów do czego taką kolejkę można użyć).

Poniżej przykładowa aplikacja, która symuluje użycie kolejki (np. wysyłanie wiadomości e-mail, które trwa pewien okres czasu):

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace HAKGERSoft {
    class PPCQueueSample {

        static void Main(string[] args) {
            // This program is a simulation of PPCQueue at work
            // Consumer is Thread.Sleep() function which simulates
            // a real-life data processing   

            PPCQueue queue=new PPCQueue(2);
            Console.WriteLine("Start processing items...");
            for(int i=0;i!=10;i++)
                queue.Enqueue(ProcessData,i);
            queue.Join();
            Console.WriteLine("All items processed..");
            Console.ReadKey();
        }

        static void ProcessData(int itemNr) {
            Console.WriteLine("Processing item {0}...",itemNr);
            Thread.Sleep(200);
        }
        
    }
}

Download paczki (implementacja, testy jednostkowe, aplikacja): PPCQueue.rar

0 komentarzy