Simple RabbitMQ Wrapper for PowerShell

I was working on a project where I needed a pool of PowerShell workers that were distributed across several nodes and should be served by a central dispatcher. So why not give AMQP with RabbitMQ a try?
The setup was very easy thanks to the documentation. And besides plenty of .NET library wrappers around “Rabbit.MQ.Client.dll” there were even two PowerShell modules on the Developer tools list. However, as it turned out they had their limitations (like working via the HTTP interface instead of AMQP directly, or just providing too much functionality where I only needed a quick dispatcher/worker queue scenario).

So my first thought was to just “Add-Type” the “Rabbit.MQ.Client.dll” client and implement Send/Receive quickly by following the tutorials. But as it turned out this was not really working as the “Rabbit.MQ.Client.dll” assembly is not CLS compliant and therefore the “ConnectionFactory” could not be instantiated. A quick search revealed I was not the only one having this issue. So I decided to write a quick wrapper around it with a few convenience methods to be used in PowerShell. The idea was not to create a full replica of the original client or to create 667 PowerShell cmdlets but to expose as much from the original client as possible while being able to send and receive a message with as few command as possible. So here we are (code complies with VS2013):

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Util;
// Add reference to Rabbit.MQ.Client.dll
namespace RabbitMQHelper
{
public class Client : IDisposable
{
#region ========== Constants ==========
private const string _VirtualHostDefault = "/";
private const string _ServerDefault = "localhost";
private const int _PortDefault = AmqpTcpEndpoint.UseDefaultPort; //5672;
private const string _UsernameDefault = "guest";
private const string _PasswordDefault = "guest";
private const string _ExchangeNameDefault = "";
private const string _QueueNameDefault = "default";
private const bool _AutoDeleteDefault = false;
private const bool _DurableDefault = true;
private const bool _ConnectionAutoCloseDefault = false;
private const int _WaitMilliSecondsDefault = 1;
#endregion
#region ========== Properties ==========
private bool disposed = false;
string _Server = _ServerDefault;
public string Server
{
get { return _Server; }
set { _Server = value; }
}
int _Port = _PortDefault;
public int Port
{
get { return _Port; }
set { _Port = value; }
}
string _QueueName = _QueueNameDefault;
public string QueueName
{
get { return _QueueName; }
set { _QueueName = value; }
}
private bool _Durable = _DurableDefault;
public bool Durable
{
get { return _Durable; }
set { _Durable = value; }
}
private bool _Exclusive;
public bool Exclusive
{
get { return _Exclusive; }
set { _Exclusive = value; }
}
private bool _AutoDelete = _AutoDeleteDefault;
public bool AutoDelete
{
get { return _AutoDelete; }
set { _AutoDelete = value; }
}
private bool _ConnectionAutoClose = _ConnectionAutoCloseDefault;
public bool ConnectionAutoClose
{
get { return _ConnectionAutoClose; }
set { _ConnectionAutoClose = value; }
}
private string _ExchangeName = _ExchangeNameDefault;
public string ExchangeName
{
get { return _ExchangeName; }
set { _ExchangeName = value; }
}
private QueueingBasicConsumer _Consumer;
public QueueingBasicConsumer Consumer
{
get { return _Consumer; }
set { _Consumer = value; }
}
private IDictionary<string, object> _QueueArguments = new Dictionary<string, object>();
public IDictionary<string, object> QueueArguments
{
get { return _QueueArguments; }
set { _QueueArguments = value; }
}
private QueueDeclareOk _QueueDeclareReturn;
public QueueDeclareOk QueueDeclareReturn
{
get { return _QueueDeclareReturn; }
set { _QueueDeclareReturn = value; }
}
private string _Username = _UsernameDefault;
public string Username
{
get { return _Username; }
set { _Username = value; }
}
private string _Password = _PasswordDefault;
public string Password
{
//get { return _Password; }
set { _Password = value; }
}
private int _WaitMilliSeconds = _WaitMilliSecondsDefault;
public int WaitMilliSeconds
{
get { return _WaitMilliSeconds; }
set { _WaitMilliSeconds = value; }
}
private string _VirtualHost = _VirtualHostDefault;
public string VirtualHost
{
get { return _VirtualHost; }
set { _VirtualHost = value; }
}
private IProtocol _Protocol = Protocols.FromEnvironment();
public IProtocol Protocol
{
get { return _Protocol; }
set { _Protocol = value; }
}
private ConnectionFactory _ConnectionFactory;
public ConnectionFactory ConnectionFactory
{
get { return _ConnectionFactory; }
set { _ConnectionFactory = value; }
}
private IConnection _Connection;
public IConnection Connection
{
get { return _Connection; }
set { _Connection = value; }
}
private IModel _Channel;
public IModel Channel
{
get { return _Channel; }
set { _Channel = value; }
}
#endregion
#region ========== Methods ==========
public IConnection Connect()
{
return Connect(_VirtualHost, _Server, _Port, _Username, _Password, null);
}
public IConnection Connect(string Server, string Username, string Password)
{
return Connect(_VirtualHost, _Server, _Port, Username, Password, null);
}
public IConnection Connect(string VirtualHost, string Server, string Username, string Password)
{
return Connect(VirtualHost, Server, _Port, Username, Password, null);
}
public IConnection Connect(string VirtualHost, string Server, int? Port, string Username, string Password, int? MaxRedirects)
{
if (null == _Connection || !_Connection.IsOpen)
{
if (null == _ConnectionFactory)
{
var __VirtualHost = VirtualHost ?? _VirtualHost;
var __Server = Server ?? _Server;
var __Port = Port ?? _Port;
var __Username = Username ?? _Username;
var __Password = Password ?? _Password;
_ConnectionFactory = new ConnectionFactory()
{
Protocol = _Protocol
,
VirtualHost = __VirtualHost
,
HostName = __Server
,
Port = __Port
,
UserName = __Username
,
Password = __Password
};
}
if (null == MaxRedirects)
{
_Connection = _ConnectionFactory.CreateConnection();
}
else
{
_Connection = _ConnectionFactory.CreateConnection((int) MaxRedirects);
}
}
return _Connection;
}
public void Disconnect()
{
Disconnect(true, true, true, false);
return;
}
public void Disconnect(bool CloseConnection, bool CloseChannel, bool CloseConsumer, bool CloseQueue)
{
if (null != _Consumer)
{
if (CloseQueue)
{
_Consumer.Queue.Close();
}
if (CloseConsumer)
{
_Consumer = null;
}
}
if (CloseChannel)
{
if ((null != _Channel) && _Channel.IsOpen)
{
_Channel.Close();
}
_QueueDeclareReturn = null;
_Channel = null;
}
if (CloseConnection)
{
if (null != _Connection)
{
if (_Connection.IsOpen)
{
_Connection.Close();
}
_Connection = null;
if (null != _ConnectionFactory)
{
_ConnectionFactory = null;
}
}
}
return;
}
public IModel CreateChannel()
{
if (null == _Channel)
{
Connect();
_Channel = _Connection.CreateModel();
_Connection.AutoClose = _ConnectionAutoClose;
}
return _Channel;
}
public QueueDeclareOk CreateQueue(string QueueName)
{
if (null == _QueueDeclareReturn || !_QueueDeclareReturn.QueueName.Equals(QueueName, StringComparison.CurrentCultureIgnoreCase))
{
_QueueName = QueueName;
CreateChannel();
_QueueDeclareReturn = _Channel.QueueDeclare(_QueueName, _Durable, _Exclusive, _AutoDelete, _QueueArguments);
_Channel.BasicQos(0, 1, false);
}
return _QueueDeclareReturn;
}
public string Receive()
{
return Receive(_QueueName, _WaitMilliSeconds);
}
public string Receive(string QueueName)
{
return Receive(QueueName, _WaitMilliSeconds);
}
public string Receive(int? WaitMilliSeconds)
{
return Receive(_QueueName, WaitMilliSeconds);
}
public string Receive(string QueueName, int? WaitMilliSeconds)
{
//var fReturn = false;
var __QueueName = QueueName ?? _QueueName;
var __WaitMilliSeconds = WaitMilliSeconds ?? _WaitMilliSeconds;
string message = null;
try {
CreateQueue(_QueueName);
if (null == _Consumer)
{
_Consumer = new QueueingBasicConsumer(_Channel);
_Channel.BasicConsume(__QueueName, false, _Consumer);
}
BasicDeliverEventArgs ea;
bool fReturn = _Consumer.Queue.Dequeue((int)__WaitMilliSeconds, out ea);
if (fReturn)
{
var body = ea.Body;
message = Encoding.UTF8.GetString(body);
_Channel.BasicAck(ea.DeliveryTag, false);
}
}
catch (Exception ex)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace));
if (null != ex.InnerException)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace));
}
throw;
}
return message;
}
public bool Send(string message)
{
return Send(_QueueName, message);
}
public bool Send(string QueueName, string message)
{
var fReturn = false;
try
{
CreateQueue(_QueueName);
var body = Encoding.UTF8.GetBytes(message);
var properties = _Channel.CreateBasicProperties();
properties.SetPersistent(true);
_Channel.BasicPublish(_ExchangeName, _QueueName, properties, body);
fReturn = true;
}
catch (Exception ex)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace));
if (null != ex.InnerException)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace));
}
throw;
}
return fReturn;
}
public Client()
{
}
public Client(string Server, string Username, string Password)
{
_Server = Server;
_Username = Username;
_Password = Password;
}
public Client(string VirtualHost, string Server, string Username, string Password)
{
_VirtualHost = VirtualHost;
_Server = Server;
_Username = Username;
_Password = Password;
}
public Client(string VirtualHost, string Server, int Port, string Username, string Password)
{
_VirtualHost = VirtualHost;
_Server = Server;
_Port = Port;
_Username = Username;
_Password = Password;
}
~Client()
{
Dispose(false);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected void Dispose(bool disposing)
{
try
{
if (disposed)
return;
if (disposing)
{
// Free any other managed objects here.
//
}
// Free any unmanaged objects here.
//
if (null != _Channel && _Channel.IsOpen)
{
}
_Channel = null;
if (null != _Connection && _Connection.IsOpen)
{
}
_Connection = null;
if (null != _Consumer)
{
}
_Consumer = null;
Disconnect();
disposed = true;
}
catch (Exception ex)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace));
if (null != ex.InnerException)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace));
}
throw;
}
}
#endregion
}
}

view raw
rabbitmq-wrapper.cs
hosted with ❤ by GitHub

In case your browser does not render the Gist correctly you can view the code directly at: https://gist.github.com/dfch/6da3d17414c913595302.

Usage is simple, you just add the assembly to your session (note: the assembly must be in the same path as the original RabbitMQ client dll) and then you specify server, username, password and your queue.

Add-Type -Path &quot;.\RabbitMQHelper.dll&quot;
$mq = New-Object RabbitMQHelper.Client;
$mq.Server = &quot;amqp.example.com&quot;;
$mq.Username = &quot;Edgar.Schnittenfittich&quot;;
$mq.Password = &quot;P@ssL0rd&quot;;
$mq.QueueName = &quot;MyQueueName&quot;;
$mq.Send(&quot;tralala&quot;);
$mq.Disconnect();

Receiving a message is nearly the same; call “Rceive()” and optionally specify the wait timeout:

Add-Type -Path &quot;.\RabbitMQHelper.dll&quot;
$mq = New-Object RabbitMQHelper.Client;
$mq.Server = &quot;amqp.example.com&quot;;
$mq.Username = &quot;Edgar.Schnittenfittich&quot;;
$mq.Password = &quot;P@ssL0rd&quot;;
$mq.QueueName = &quot;MyQueueName&quot;;
# receive message but return immediately if none
$Message = $mq.Receive(0);
# receive message but wait only 5 seconds
$Message = $mq.Receive(5000);
# receive message and wait indefinitely
$Message = $mq.Receive();
$mq.Disconnect();

There was a strange behavior I observed after connecting a consumer and receving from a queue. On the first fetch/dequeue operation with a WaitTimeout of 0 no message was returned. On a subsequent fetch the message was received. When using an infinite WaitTimeout (-1) the message was received on the first call.

In case you get an error while trying to connect to your AMQP server have a look at the ErrorRecord, especially the second InnerException of the Exception:

PS > $mq.Send("tralala");
Exception calling "Send" with "1" argument(s): "None of the specified endpoints were reachable"
At line:1 char:1
+ $mq.Send("tralala");
+ ~~~~~~~~~~~~~~~~~~~
    + CategoryInfo          : NotSpecified: (:) [], MethodInvocationException
    + FullyQualifiedErrorId : BrokerUnreachableException

PS > $error[0].Exception
Exception calling "Send" with "1" argument(s): "None of the specified endpoints were reachable"
PS > $error[0].Exception.InnerException
None of the specified endpoints were reachable
PS > $error[0].Exception.InnerException.InnerException
ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

The broker log file (which you will find below the %APPDATA% folder) will sometimes give you more details, such as telling you about lacking permissions or wrong credentials:

=INFO REPORT==== 24-Aug-2014::01:12:59 ===
accepting AMQP connection <0.281.0> (amqpclient.example.com:54082 -> amqp.example.com:5672)

=ERROR REPORT==== 24-Aug-2014::01:13:02 ===
closing AMQP connection <0.281.0> (amqpclient.example.com:54082 -> amqp.example.com:5672):
{handshake_error,starting,0,
 {amqp_error,access_refused,
 "PLAIN login refused: user 'guest' can only connect via localhost",
 'connection.start_ok'}}

=INFO REPORT==== 24-Aug-2014::01:13:21 ===
accepting AMQP connection <0.286.0> (amqpclient.example.com:54083 -> amqp.example.com:5672)

=ERROR REPORT==== 24-Aug-2014::01:13:24 ===
closing AMQP connection <0.286.0> (amqpclient.example.com:54083 -> amqp.example.com:5672):
{handshake_error,starting,0,
 {amqp_error,access_refused,
 "PLAIN login refused: user 'Edgar.Schnittenfittich' - invalid credentials",
 'connection.start_ok'}}

Note: if you have a fresh installation of RabbitMQ you might want to add a user so you can connect remotely to your server:

Create new user so we can connect from remote
C:\> rabbitmqctl.bat add_user Administrator P@ssL0rd
Make this user read/write admin
C:\> rabbitmqctl.bat set_user_tags Administrator administrator
C:\> rabbitmqctl.bat set_permissions -p / Administrator .* .* .*

C:\> rabbitmqctl.bat list_users
Listing users ...
Administrator   [administrator]
guest   [administrator]
...done.

C:\> rabbitmqctl.bat list_permissions
Listing permissions in vhost &quot;/&quot; ...
Administrator   .*   .*   .*
guest   .*   .*   .*
...done.

And it is always a good idea to watch your connections, consumers, queues and the like:

C:\> rabbitmqctl.bat list_connections
Listing connections ...
Administrator   192.168.174.148 53825   running
Administrator   192.168.174.148 53828   running
...done.

C:\> rabbitmqctl.bat list_queues
Listing queues ...
default 0
q2      0
q3      1786889
task_queue      0
...done.

C:\> rabbitmqctl.bat list_consumers
Listing consumers ...
q3      <rabbit@amqpserver.1.1609.0>     amq.ctag-K1U5l7foM-7RDhxfcgOwTw true
1       []
q3      <rabbit@amqpserver.1.1665.0>     amq.ctag-fB8dS9ZeJt9jNDP4EbCuBw true
1       []
...done.

Comments

  1. Reblogged this on Dinesh Ram Kali..

Trackbacks

  1. […] couple of days ago I wrote about how to use RabbitMQ with AMQP from PowerShell (via a small assembly that utilized the original C# .NET Client). After I implemented it in the use case for our customer […]

  2. […] module contains Cmdlets to work with RabbitMQ queues, send and receive messages. It uses a wrapper I earlier presented that solved the problem of loading the official RabbitMQ .NET client into a PowerShell session. […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: