Przesylanie obiektow przez MSMQ

Deti

1 Wstęp
2 MSMQ
3 MSMQ i .NET
4 Źródło komponentu
     4.1 Odbiorca
     4.2 Krok po kroku
5 Podsumowanie

Wstęp

Jeśli chcemy przesłać jakiś obiekt z programu do innego programu - obiekt ten w jakiś sposób musi przemieścić się pomiędzy dwoma procesami. Można to zrobić na wiele sposobów - lepszych lub gorszych (np. wykorzystując .NET Remoting). W tym gotowcu opisano jeden ze sposobów - poprzez użycie MSMQ.

MSMQ

To skrót od Microsoft Message Queuing - jedna z technologii Microsoft, która służy do komunikowania się pomiędzy procesami a nawet serwerami poprzez spójny system kolejkowania wiadomości. Innymi słowy - jest to nic innego jak obiekt typu FIFO, który Microsoft uruchamia jako usługę. Oczywiście MSMQ to coś więcej niż tylko zwykła kolejka - technologia ta jest bardzo rozbudowana, a pełny opis możliwości wykroczyłby z ram tego artykułu. Zajmiemy się jedynie wąskim zastosowaniem MSMQ w scenariuszu - najprostrza kolejka, która będzie służyła do przekazywania danych z aplikacji do aplikacji.

MSMQ i .NET

Jako, że MSMQ jest technologią Microsoft'u - można założyć, że obsługa kolejkowania wiadomości z poziomu .NET jest bardzo łatwa. Tak też jest w rzeczywistości. Zestaw klas dla tej usługi znajdują się po współną przestrzenią System.Messaging (ten namespace nie jest standardowo w referencjach aplikacji - należy dodać referencję do biblioteki .NET o tej samej nazwie).

Do gotowca wykorzystano najprostrzy przykład - dwie aplikacje (nadawca i odbiorca), które używają tej samej kolejki.

Źródło komponentu

Poniżej mózg całej operacji - czyli klasa MSMQDispatcher. Ma za zadania enkapsulować całą logikę dotyczącą MSMQ a wystawiać jedynie proste metody i zdarzenia do przesyłania / odbierania obiektów.


using System;
using System.Collections.Generic;
using System.Messaging;
using System.Diagnostics;
using System.Threading;

namespace HAKGERSoft {

    /// <summary>
    /// Sends and receives .NET objects between processes using MSMQ
    /// </summary>
    public class MSMQDispatcher {
        readonly string QueuePath;
        readonly MessageQueue Queue;
        Cursor Location;
        List<Type> RegisteredTypes;
        SynchronizationContext Context;

        /// <summary>
        /// Occurs when new message arrive - use RegisterType() before
        /// </summary>
        public event EventHandler<DispatcherEventArgs> OnReceived;

        /// <summary>
        /// Creates new MSMQDispatcher instance
        /// </summary>
        /// <param name="queueName">Common name of the MSMQ queue that is used between processes</param>
        public MSMQDispatcher(string queueName) {
            QueuePath=GetQueuePath(queueName);
            if(!MessageQueue.Exists(QueuePath)) {
                Debug.WriteLine("Provided queue does not exist, creating new one");
                MessageQueue.Create(QueuePath);
            }
            Queue=new MessageQueue(QueuePath);
            RegisteredTypes=new List<Type>();
        }

        /// <summary>
        /// Sends an object 
        /// </summary>
        /// <typeparam name="T">Type of object to be sent</typeparam>
        /// <param name="label">Additional label of the message</param>
        /// <param name="obj">Object to sent</param>
        public void Send<T>(string label,T obj) {
            Queue.Send(obj,label);
        }

        /// <summary>
        /// Clears all previous messages from the queue
        /// </summary>
        public void Purge() {
            Queue.Purge();
        }

        /// <summary>
        /// Start listen MSMQ for new messages - use this function if you want to receive data
        /// </summary>
        public void StartListener() {
            if(Location!=null)
                throw new InvalidOperationException("Listener already started");
            Context=SynchronizationContext.Current;
            if(Context==null)
                Context=new SynchronizationContext();
            Location=GetCurrentLocarion();
            BeginPeek(PeekAction.Current);
        }

        /// <summary>
        /// Register type that should be received
        /// </summary>
        /// <typeparam name="T"></typeparam>
        public void RegisterType<T>() {
            RegisteredTypes.Add(typeof(T));
        }

        string GetQueuePath(string queueName) {
            return string.Format(@".\private$\{0}",queueName);
        }

        void BeginPeek(PeekAction peekAction) {
            Queue.BeginPeek(MessageQueue.InfiniteTimeout,Location,peekAction,null,QueueAsyncCallBack);
        }

        void QueueAsyncCallBack(IAsyncResult asyncResult) {
            Message message= Queue.EndPeek(asyncResult);
            message.Formatter=new XmlMessageFormatter(RegisteredTypes.ToArray());
            object body=null;
            try {
                body=message.Body;
            } catch(InvalidOperationException ex) {
                Debug.WriteLine(ex.ToString());
            }
            if(body!=null)
                RaiseReceived(message.Label,body);
            BeginPeek(PeekAction.Next);
        }

        void RaiseReceived(string label,object message) {
            PostCallback<DispatcherEventArgs>(OnReceived,new DispatcherEventArgs(label,message));
        }

        void PostCallback<T>(EventHandler<T> handler,T args) where T:EventArgs {
            if(handler == null)
                return;
            Context.Post(new SendOrPostCallback((state) => { 
                handler(this,args); 
            }),null);
        }

        Cursor GetCurrentLocarion() {
            Cursor cursor=Queue.CreateCursor();
            try {
                Queue.Peek(TimeSpan.Zero,cursor,PeekAction.Current);
                while(true) {
                    Queue.Peek(TimeSpan.Zero,cursor,PeekAction.Next);
                }
            } catch(MessageQueueException ex) {
                if(ex.MessageQueueErrorCode!=MessageQueueErrorCode.IOTimeout) {
                    throw;
                }
            }
            return cursor;
        }
    }

    public class DispatcherEventArgs:EventArgs {
        public readonly string MessageLabel;
        public readonly object MessageObject;

        public DispatcherEventArgs(string label,object message) {
            MessageLabel=label;
            MessageObject=message;
        }
    }
}

Jeśli chodzi o kontrolki to komentarza wymaga jedynie kontrolka textBox1, która zawiera tekst wiadomości do wysłania. Konstruktor MSMQDispatcher zawiera argument, który jest nazwą dla kolejki (powinna ona być taka sama dla nadawcy i odbiorcy). Jak widać metoda Send oprócz samego obiektu - ma też dodatkowy (pierwszy) argument, który zawiera dodatkową informacje o wiadomości.

Odbiorca

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using HAKGERSoft;

namespace Sample_Receiver {
    public partial class Form1:Form {
        MSMQDispatcher Dispatcher=new MSMQDispatcher("HAKGERSoft MSMQ Sample");

        public Form1() {
            InitializeComponent();
            Dispatcher.RegisterType<string>();
            Dispatcher.OnReceived+=new EventHandler<DispatcherEventArgs>(Dispatcher_OnReceived);
            Dispatcher.StartListener();
        }

        void Dispatcher_OnReceived(object sender,DispatcherEventArgs e) {
            if(e.MessageLabel=="HAKGERSoft MSMQ message")
                label2.Text=e.MessageObject as string;
        }
    }
}

Jak widać, oba programy używają swoich instancji klasy MSMQDispatcher - o tej samej nazwie kolejki "HAKGERSoft MSMQ Sample". Gdyby nazwy były inne - programy nie mogły by się komunikować (podobnie można zastosować różne kanały komunikacji - stosując różne nazwy kolejek).

Krok po kroku

  • Dispatcher.RegisterType<string>(); - metoda rejestruje typ jaki może przyjść do kolejki (ponieważ w drugiej aplikacji wysyłamy typ string - tu musimy go podać). Jest to informacji dla serializatora jakie typy na deserializować. Przesyłać można nie tylko obiekty .NET'owe, ale równnież własne.
  • Dispatcher.OnReceived+=new EventHandler<dispatchereventargs>(Dispatcher_OnReceived); -przypisanie delegatu do zdarzenia, które wykonuje się, gdy nadejdzie nowa wiadomość (obiekt).
  • Dispatcher.StartListener(); wystartowanie nasłuchu. Nadawca nie musi tego wywoływać, jeśli chce tylko wysyłać wiadomości - a nie odbierać.
Jak widać czytanie obiektu jest bardzo łatwe i myślę, że nie wymaga dodatkowego komentarza.

Poniżej poglądowy schemat działania:

msmqdisp.png

Podsumowanie

W prosty sposób przesyłamy informacje z programu do programu. Metoda niestety wymaga zainstalowanej usługi MSMQ (która standardowo nie jest zainstalowana w systemie Windows - ale można ją zainstalować w dosłownie kilka minut - szczegóły tu: http://support.microsoft.com/kb/256096). Sam sposób jednak daje dość duże możliwości:

  • Program może wysyłać dowolnie duże obiekty w dowolnej liczbie
  • Dowolne programy mogą wysyłać i odbierać wiadomości w dowolnej konfiguracji (np. jeden nadawca, wiele odbiorców).

Po niewielkich modyfikacjach komponentu możliwe staje się również:

  • Odbieranie wiadomości z opóźnieniem (program wysyła wiadomość, kończy działanie - a po czasie odbiorca się uruchamia i odczytuje stare wiadomości).
  • Przesyłanie wiadomości pomiędzy serwerami (zobacz google: MSMQ public queues)

Cała biblioteka dostępna na stronie www.hakger.org

2 komentarzy

To wymyślili twórcy pierwszych systemów operacyjnych w dzisiejszym (i tym sprzed 20-30 lat) znaczeniu tego terminu. Zresztą ogólnie jest wiadomo, że do stworzenia systemu Windows NT Microsoft wynajął czy wykupił twórcę/twórców badajże najlepszego systemu operacyjnego czasu rzeczywistego VMS firmy DEC (Digital; http://pl.wikipedia.org/wiki/OpenVMS ).
Microsoft zastosował tę ideę jak to na system operacyjny przystało.

To MSM wymyślił?