Kolejka producer-consumer działająca na wielu rdzeniach
Deti
1 Wstęp
2 Implementacja
3 Przykład użycia
Wstęp
W tym gotowcu znajdziecie rozbudowany przykład z książki "C# 4.0 in a Nutshell", który opisuje kolejkę typu producer-consumer. Istnieją już różne implementację tej funkcjonalności w C#, ale dzięki klasom z .NET 4.0 możliwe jest, aby proces kolejnych elementów w kolejce odbywał się na wielu rdzeniach (jako, że jednym z kluczowych elementów C#/.NET 4.0 jest parallel programming). Przykład rozbudowałem o możliwości Pause/Resume oraz możliwość podania kolejce czegoś więcej niż tylko delegata typu Action();
Implementacja
Kod nie jest długi i mieści się w jednej klasie:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
namespace HAKGERSoft {
public sealed class PPCQueue: IDisposable {
abstract class QueueWorkItem {
public readonly TaskCompletionSource<object> TaskSource;
public readonly CancellationToken? CancelToken;
public abstract void Process();
public QueueWorkItem(CancellationToken? cancelToken){
TaskSource=new TaskCompletionSource<object>();
CancelToken = cancelToken;
}
}
class SimpleWorkItem: QueueWorkItem {
readonly Action ActionDel;
public SimpleWorkItem(CancellationToken? cancelToken,Action action)
:base(cancelToken){
ActionDel=action;
}
public override void Process() {
ActionDel();
}
}
class DataWorkItem<T>: QueueWorkItem{
readonly Action<T> ActionDel;
readonly T Data;
public DataWorkItem(CancellationToken? cancelToken,Action<T> action,T data)
:base(cancelToken){
ActionDel=action;
Data=data;
}
public override void Process() {
ActionDel(Data);
}
}
bool Disposed;
readonly BlockingCollection<QueueWorkItem> Queue;
readonly ManualResetEvent QueueWaitHandle;
readonly Task[] Tasks;
public PPCQueue(int workerCount){
if(workerCount<0)
throw new ArgumentException("workerCount must be greater or equal 0");
Queue=new BlockingCollection<QueueWorkItem>();
QueueWaitHandle=new ManualResetEvent(false);
Tasks=GetTasks(workerCount).ToArray();
QueueWaitHandle.Set();
}
IEnumerable<Task> GetTasks(int workerCount){
while(workerCount-->0)
yield return Task.Factory.StartNew(Consume);
}
public Task Enqueue(Action action) {
return Enqueue(action,null);
}
public Task Enqueue(Action action,CancellationToken? cancelToken) {
return Enqueue(new SimpleWorkItem(cancelToken,action));
}
public Task Enqueue<T>(Action<T> action,T data) {
return Enqueue<T>(action,data,null);
}
public Task Enqueue<T>(Action<T> action,T data, CancellationToken? cancelToken) {
return Enqueue(new DataWorkItem<T>(cancelToken,action,data));
}
Task Enqueue(QueueWorkItem queueWorkItem){
Queue.Add(queueWorkItem);
return queueWorkItem.TaskSource.Task;
}
public void Pause(){
QueueWaitHandle.Reset();
}
public void Resume(){
QueueWaitHandle.Set();
}
public void Join(){
Queue.CompleteAdding();
QueueWaitHandle.Set();
Task.WaitAll(this.Tasks);
}
void Consume(){
foreach(QueueWorkItem workItem in this.Queue.GetConsumingEnumerable()){
QueueWaitHandle.WaitOne();
if(workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) {
workItem.TaskSource.SetCanceled();
}
else{
try {
workItem.Process();
workItem.TaskSource.SetResult(null);
}
catch(Exception ex) {
workItem.TaskSource.SetException(ex);
}
}
}
}
public void Dispose() {
if(!this.Disposed&&!Queue.IsCompleted) {
Queue.CompleteAdding();
Disposed=true;
}
}
}
}
Przykład użycia
W paczce z całością (patrz dół artykułu) znajdziecie przykładową aplikację oraz testy jednostkowe, które pomogą w obsłudze kolejki (daruję sobie wymienianie przykładów do czego taką kolejkę można użyć).
Poniżej przykładowa aplikacja, która symuluje użycie kolejki (np. wysyłanie wiadomości e-mail, które trwa pewien okres czasu):
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace HAKGERSoft {
class PPCQueueSample {
static void Main(string[] args) {
// This program is a simulation of PPCQueue at work
// Consumer is Thread.Sleep() function which simulates
// a real-life data processing
PPCQueue queue=new PPCQueue(2);
Console.WriteLine("Start processing items...");
for(int i=0;i!=10;i++)
queue.Enqueue(ProcessData,i);
queue.Join();
Console.WriteLine("All items processed..");
Console.ReadKey();
}
static void ProcessData(int itemNr) {
Console.WriteLine("Processing item {0}...",itemNr);
Thread.Sleep(200);
}
}
}
Download paczki (implementacja, testy jednostkowe, aplikacja): PPCQueue.rar