Aktuell wage ich mich für ein Firmenprojekt RabbitMQ heran. Einfach gesagt ist RabbitMQ eine Serveranwendung über die Nachrichten zwischen verschiedenen Sendern und Empfängern ausgetauscht werden können. In diesem Post möchte ich meine kleine Reise vom absoluten Neuling bis ich das Ganze verstanden habe etwas dokumentieren.
Sehr hilfreich fand ich dabei dieses Tutorial: https://www.cloudamqp.com/blog/part1-rabbitmq-for-beginners-what-is-rabbitmq.html
Warum RabbitMQ?
Man sollte Code einfach schreiben und Programme einfach denken, denn kompliziert werden sie von alleine. Das habe ich - wie wohl jeder andere auch - in meiner Arbeit schon oft erlebt. Da auch in meinen aktuellen Projekten der Bedarf nach neuen Features hoch ist und immer mehr Teilsysteme dazukommen, haben wir uns im Team überlegt wie wir das gerne in Zukunft besser handhaben wollen. Dabei ist eine tolle Gesamtstruktur für die Projekte herausgekommen.
Herzstück der neuen Struktur sind einerseits eine schmale API für die Kernfunktionen des Produkts, andererseits eine einheitliche Kommunikation zwischen den Systemen über einen MessageBus.
Wir haben überlegt und evaluiert welcher MessageBus für uns in Frage kommt und RabbitMQ hat sich angeboten, da wir die Flexibilität in den Kommunikationswegen schätzen (In-Memory, TCP, Http) und vor allem da es ein etablierter Industriestandard ist und es so sehr viele Anleitungen, Tutorials und Problemlösungen zu allen gängigen Aufgabenstellungen und Problemen gibt.
Grundsätzlich erhoffen wir uns, dass unsere Infrastruktur weiter wachsen kann, ohne dass die Komplexität allzu stark zunimmt, indem wir Teilbereiche entkoppeln, Domainwissen und -daten an zentralen Stellen anbieten und die einzelnen Bausteine überschaubar halten.
Coole Features von RabbitMQ
- Eine Queue kann von mehreren Konsumenten abgearbeitet werden, oder es können verschiedene Queues für beliebig viele Konsumenten identisch befüllt werden. Das ermöglicht z.B. ein asynchrones Abarbeiten von Renderaufträgen.
- Wie bereits erwähnt kann man seine RabbitMQ sogar rein In-Memory (z.B. mit Mediator) aufsetzen
- RabbitMQ ist super schnell
- Durch Plugins erweiterbar (z.B. UI-Manager, Streams)
Installation mit dem Windows Installer
Hinweis: XXX
steht für irgendeine Versionsnummer.
Erlang (Programmiersprache) installieren: https://www.erlang.org/downloads
a. Unbedingt als Admin installieren
b. Benötigt einen PC NeustartRabbitMQ herunterladen und installieren: https://github.com/rabbitmq/rabbitmq-server/releases (
rabbitmq-server-XXX.exe
)
a. Anleitung hier: https://www.rabbitmq.com/install-windows.html
b. Ebenfalls als Admin installierenRabbitMQ UI aktivieren
a.RabbitMQ Command Prompt
öffnen -> Ordner ist...\RabbitMQ Server\rabbitmq_server-XXX\sbin
b. Befehl ausführen:rabbitmq-plugins enable rabbitmq_management
c. Service neu starten mitrabbitmq-service.bat stop
und anschließendrabbitmq-service.bat start
d. Nach ca. 1 Minute ist die UI nun unter http://localhost:15672 erreichbar. Benutzer:guest
, Passwort:guest
Remotezugriff aktivieren
a. TabAdmin
in der UI öffnen
b. Einen neuen Benutzer hinzufügen. Wichtig: Tag mussadministrator
sein.
c.
d. Jetzt muss man noch dem Admin-Benutzer die Berechtigung zum Zugriff geben. Dazu auf den Namen klicken und einfach die voreingestellten Berechtigungen hinzufügen
e.
f. Remotezugriff mit diesem Benutzer ist jetzt möglich. DazuServerurl/15627
aufrufen, z.B. http://localhost:15672/
Grundkonzept
Auf TryRabbitMQ kann man sehr schön sehen wie sich RabbitMQ in den Grundzügen verhält:
Die Message
Eine Message ist ein Bytestream der irgendwie versendet wird. Das kann natürlich ein konvertierter String, oder ein Objekt im JSON-Format sein. Die Nachricht interessiert sich nicht für das Transportmittel, sondern kapselt nur ihre Informationen.
Wichtig ist, dass eine Message Metadaten wie z.B. einen Binding-Key enthalten kann, der als Routing-Information im Exchange genutzt werden kann.
Der Producer
Ein Producer ist irgendein Client der eine Nachricht an den Server übermittelt.
Die Connection
Es wird in den meisten Setups eine TCP-Verbindung zwischen Client und Server aufgebaut. Als Faustregel gilt: Jede Applikation sollte nur 1 Connection offen haben, da diese ressorcenintensiv sind.
Der Channel
Je Connection können mehrere Channels angelegt werden, über die der Client mit dem Server kommunizieren kann. Als Faustregel gilt hier, dass jeder Task seinen eigenen Task haben sollte, da diese üblicherweise nicht Multithreading-sicher implementiert sind.
Alle offenen Channel schließen sich automatisch, wenn die Connection geschlossen wird.
Der Broker
Der Broker ist der Nachrichtenserver in seiner Gesamtheit. Das kann ein richtiger Server sein, oder auch "nur" eine In-Memory Routing-Klasse.
Der Exchange
Der Exchange hat die Aufgabe die empfangenen Nachrichten den richtigen Queues zuzuordnen. Dazu hat er verschiedene Grundkonfigurationen:
- Direct: Die Message wird nur an Queues weitergeleitet deren Binding-Key exakt dem der Nachricht entspricht
- Fanout: Die Message wird an alle verbundenen Queues weitergeleitet, ganz unabhängig vom Binding-Key
- Topic: Benutzt Wildcards, um ein Pattern-Matching durchzuführen
- Headers: Spezialfall bei dem die Message-Header fürs Routing genutzt werden
Die Queue
Ein Buffer der Nachrichten zwischenspeichert. Hierbei gilt im allgemeinen FIFO. Es gibt aber auch z.B. die Möglichkeit einer Priorisierung.
Wichtig ist zu wissen, dass jede Nachricht nur 1x aus der Queue geholt werden kann (es sei denn das Abholen läuft auf ein Timeout hinaus, dann wir Requeued). D.h. wenn mehrere Consumer auf dieselbe Queue zugreifen, wird jede Nachricht nur an genau 1 Consumer weitergereicht. Möchte man eine Nachricht an mehrere Consumer verteilen, benötigt jeder Consumer seine eigene Queue in welche die Nachricht jeweils dupliziert wird.
Der Consumer
Ein oder mehrere Consumer fordern die Daten von der Queue an. Sobald eine Nachricht erhalten wurde (Standard), bzw. bestätigt wurde, wird sie aus der Queue gelöscht. Bis dahin ist sie für andere Konsumenten unsichtbar.
Mehr ...
Das sind nur die absoluten Basics, es gibt diverse Einstellungen und Möglichkeiten. Es empfiehlt sich einerseits die Tutorials zu machen, andererseits die Dokumentation zu studieren. Hier werden viele Fragen gut erklärt.
Das RabbitMQ Management Interface
Das Interface hilft dabei zu verstehen wie die Producer, Exchanges, Queues und Consumer zusammenarbeiten ... oder auch nicht arbeiten. Es ist im allgemeinen unter http://localhost:15672/, bzw. mit dem Servernamen erreichbar.
Es ist gut zu wissen, dass man über Queues auch von hier aus Exchanges und Queues anlegen kann. Das geht auch über den Code, aber wahrscheinlich macht es am meisten Sinn alle dauerhaften Dinge hierüber einzurichten und alles weitere bei Bedarf über den Code.
Das Ganze habe ich mir noch nicht in der Tiefe angeschaut. Interessant könnten aber z.B. die User und Virtual Hosts sein. Damit sollte es möglich sein abgegrenzte Bereiche zu definieren in denen Benutzern Lese- und Schreibrechte vergeben werden können. Ist aber zum jetzigen Zeitpunkt noch nicht relevant.
Für mehr Infos empfehle ich den Folgepost (zu dem oben) von cloudamqp: https://www.cloudamqp.com/blog/part3-rabbitmq-for-beginners_the-management-interface.html
Basic Setup
1. Die Connection einrichten
var factory = new ConnectionFactory()
{
HostName = "http://localhost:15672/",
VirtualHost = "/",
UserName = "guest",
Password = "guest",
};
var connection = factory.CreateConnection();
Hinweis: Wie bereits erwähnt sollte es nur 1 Connection je Applikation geben. Sie kann also z.B. als Singleton deklariert werden, wenn man DI nutzt.
2. Den Channel öffnen
var channel = connection.CreateModel();
Hinweis: Empfehlung ist 1 Channel je Task.
3. Eine Queue deklarieren
channel.QueueDeclare(
queue: "letterbox",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Hier gibt es viel zu erklären:
- queue: Der Name der Queue. Wird er leergelassen, wird der Name vom Server generiert
-
durable: Soll die Queue auf die Platte gespiegelt werden, damit im Falle eines Serverneustarts die Daten nicht verloren sind? Bei
false
wird alles nur im RAM gehalten. - exclusive: Die Queue wird gelöscht, wenn die einzige Connection mit Zugriff darauf geschlossen wird
- autoDelete: Die Queue wird gelöscht, sobald es keinen aktiven Konsumenten mehr gibt
- arguments: Weitere Features und Plugins nutzen diese
Wie weiter oben beschrieben halte ich es durchaus für sinnvoll, wenn man dauerhafte Queues über das Admin-Interface definiert. Es ist aber sehr cool, wenn ein Service z.B. eine exklusive Queue mit einem BindingKey anlegt und alle neuen Nachrichten dann immer auch in diese Queue geschrieben werden. Wird der Service dann gestoppt, verschwindet die Queue auch sofort wieder mitsamt der Daten.
4. Eine Nachricht versenden
var message = "This is my message number " + i++;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "letterbox", null, body);
- Der erste Wert wäre der Name des
Exchange
den man ansprechen will. Mit Leerstring wird der Default-Exchange verwendet. -
letterbox
ist der Name des Routings, bzw. in dem Fall der Queue - Der
body
ist ein Byte-Array oder -Stream
5. Die Nachricht empfangen
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(
queue: "letterbox",
autoAck: true, // Nachricht aus der Queue löschen, sobald sie erhalten wurde
consumer: consumer
);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Nachricht erhalten: {message}");
};
Praktische Anwendungsszenarien
Dokument-Rendition
Eine Anforderung ist, dass alle eingestellten Dokumente automatisch eine Vorschau gerendert bekommen. Der Aufbau mit der Queue könnte so aussehen:
- An einer Stelle im System (in diesem Fall der
DocumentService
) wird ein neues Dokument eingestellt - Eine Nachricht mit den Dokument-Metadaten (z.B. Id, Speicherort) wird an den Broker geschickt
- Der Exchange leitet die Nachricht an die Queue weiter, wenn sie den Tag
rendition-task
gesetzt hat. - Ein RenditionWorker kann sich den Task aus der Queue ziehen, sobald er Kapazität hat
UI-Aktualisierung
Eine weitere Anforderung ist, dass sich alle Ansichten aktualisieren müssen, in denen das Dokument zu sehen sein wird. Das könnte dann so aussehen:
- Der Service benachrichtigt über
document-changed
- Jede Queue die den Key hinterlegt hat, bekommt die Nachricht
- Die
renditionQueue
gibt die Nachricht nur an 1 ServiceWorker - Jede der
uiNotifQueues
bekommt eine eigene Nachrichtenkopie - Die Nachricht wird an den jeweiligen Consumer weitergeleitet. Auf diese Weise könnten auch 100te Consumer ihre jeweils eigene (temporäre!) Queue eröffnen
Soweit mal zu den Basics. Ich denke, dass man damit schon einiges anfangen kann. Im nächsten Teil werde ich dann mehr in die Details reinschauen.
Top comments (0)