I was working on a project where I needed a pool of PowerShell workers that were distributed across several nodes and should be served by a central dispatcher. So why not give AMQP with RabbitMQ a try?
The setup was very easy thanks to the documentation. And besides plenty of .NET library wrappers around “Rabbit.MQ.Client.dll” there were even two PowerShell modules on the Developer tools list. However, as it turned out they had their limitations (like working via the HTTP interface instead of AMQP directly, or just providing too much functionality where I only needed a quick dispatcher/worker queue scenario).
So my first thought was to just “Add-Type” the “Rabbit.MQ.Client.dll” client and implement Send/Receive quickly by following the tutorials. But as it turned out this was not really working as the “Rabbit.MQ.Client.dll” assembly is not CLS compliant and therefore the “ConnectionFactory” could not be instantiated. A quick search revealed I was not the only one having this issue. So I decided to write a quick wrapper around it with a few convenience methods to be used in PowerShell. The idea was not to create a full replica of the original client or to create 667 PowerShell cmdlets but to expose as much from the original client as possible while being able to send and receive a message with as few command as possible. So here we are (code complies with VS2013):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using System.Diagnostics; | |
using RabbitMQ.Client; | |
using RabbitMQ.Client.Events; | |
using RabbitMQ.Util; | |
// Add reference to Rabbit.MQ.Client.dll | |
namespace RabbitMQHelper | |
{ | |
public class Client : IDisposable | |
{ | |
#region ========== Constants ========== | |
private const string _VirtualHostDefault = "/"; | |
private const string _ServerDefault = "localhost"; | |
private const int _PortDefault = AmqpTcpEndpoint.UseDefaultPort; //5672; | |
private const string _UsernameDefault = "guest"; | |
private const string _PasswordDefault = "guest"; | |
private const string _ExchangeNameDefault = ""; | |
private const string _QueueNameDefault = "default"; | |
private const bool _AutoDeleteDefault = false; | |
private const bool _DurableDefault = true; | |
private const bool _ConnectionAutoCloseDefault = false; | |
private const int _WaitMilliSecondsDefault = –1; | |
#endregion | |
#region ========== Properties ========== | |
private bool disposed = false; | |
string _Server = _ServerDefault; | |
public string Server | |
{ | |
get { return _Server; } | |
set { _Server = value; } | |
} | |
int _Port = _PortDefault; | |
public int Port | |
{ | |
get { return _Port; } | |
set { _Port = value; } | |
} | |
string _QueueName = _QueueNameDefault; | |
public string QueueName | |
{ | |
get { return _QueueName; } | |
set { _QueueName = value; } | |
} | |
private bool _Durable = _DurableDefault; | |
public bool Durable | |
{ | |
get { return _Durable; } | |
set { _Durable = value; } | |
} | |
private bool _Exclusive; | |
public bool Exclusive | |
{ | |
get { return _Exclusive; } | |
set { _Exclusive = value; } | |
} | |
private bool _AutoDelete = _AutoDeleteDefault; | |
public bool AutoDelete | |
{ | |
get { return _AutoDelete; } | |
set { _AutoDelete = value; } | |
} | |
private bool _ConnectionAutoClose = _ConnectionAutoCloseDefault; | |
public bool ConnectionAutoClose | |
{ | |
get { return _ConnectionAutoClose; } | |
set { _ConnectionAutoClose = value; } | |
} | |
private string _ExchangeName = _ExchangeNameDefault; | |
public string ExchangeName | |
{ | |
get { return _ExchangeName; } | |
set { _ExchangeName = value; } | |
} | |
private QueueingBasicConsumer _Consumer; | |
public QueueingBasicConsumer Consumer | |
{ | |
get { return _Consumer; } | |
set { _Consumer = value; } | |
} | |
private IDictionary<string, object> _QueueArguments = new Dictionary<string, object>(); | |
public IDictionary<string, object> QueueArguments | |
{ | |
get { return _QueueArguments; } | |
set { _QueueArguments = value; } | |
} | |
private QueueDeclareOk _QueueDeclareReturn; | |
public QueueDeclareOk QueueDeclareReturn | |
{ | |
get { return _QueueDeclareReturn; } | |
set { _QueueDeclareReturn = value; } | |
} | |
private string _Username = _UsernameDefault; | |
public string Username | |
{ | |
get { return _Username; } | |
set { _Username = value; } | |
} | |
private string _Password = _PasswordDefault; | |
public string Password | |
{ | |
//get { return _Password; } | |
set { _Password = value; } | |
} | |
private int _WaitMilliSeconds = _WaitMilliSecondsDefault; | |
public int WaitMilliSeconds | |
{ | |
get { return _WaitMilliSeconds; } | |
set { _WaitMilliSeconds = value; } | |
} | |
private string _VirtualHost = _VirtualHostDefault; | |
public string VirtualHost | |
{ | |
get { return _VirtualHost; } | |
set { _VirtualHost = value; } | |
} | |
private IProtocol _Protocol = Protocols.FromEnvironment(); | |
public IProtocol Protocol | |
{ | |
get { return _Protocol; } | |
set { _Protocol = value; } | |
} | |
private ConnectionFactory _ConnectionFactory; | |
public ConnectionFactory ConnectionFactory | |
{ | |
get { return _ConnectionFactory; } | |
set { _ConnectionFactory = value; } | |
} | |
private IConnection _Connection; | |
public IConnection Connection | |
{ | |
get { return _Connection; } | |
set { _Connection = value; } | |
} | |
private IModel _Channel; | |
public IModel Channel | |
{ | |
get { return _Channel; } | |
set { _Channel = value; } | |
} | |
#endregion | |
#region ========== Methods ========== | |
public IConnection Connect() | |
{ | |
return Connect(_VirtualHost, _Server, _Port, _Username, _Password, null); | |
} | |
public IConnection Connect(string Server, string Username, string Password) | |
{ | |
return Connect(_VirtualHost, _Server, _Port, Username, Password, null); | |
} | |
public IConnection Connect(string VirtualHost, string Server, string Username, string Password) | |
{ | |
return Connect(VirtualHost, Server, _Port, Username, Password, null); | |
} | |
public IConnection Connect(string VirtualHost, string Server, int? Port, string Username, string Password, int? MaxRedirects) | |
{ | |
if (null == _Connection || !_Connection.IsOpen) | |
{ | |
if (null == _ConnectionFactory) | |
{ | |
var __VirtualHost = VirtualHost ?? _VirtualHost; | |
var __Server = Server ?? _Server; | |
var __Port = Port ?? _Port; | |
var __Username = Username ?? _Username; | |
var __Password = Password ?? _Password; | |
_ConnectionFactory = new ConnectionFactory() | |
{ | |
Protocol = _Protocol | |
, | |
VirtualHost = __VirtualHost | |
, | |
HostName = __Server | |
, | |
Port = __Port | |
, | |
UserName = __Username | |
, | |
Password = __Password | |
}; | |
} | |
if (null == MaxRedirects) | |
{ | |
_Connection = _ConnectionFactory.CreateConnection(); | |
} | |
else | |
{ | |
_Connection = _ConnectionFactory.CreateConnection((int) MaxRedirects); | |
} | |
} | |
return _Connection; | |
} | |
public void Disconnect() | |
{ | |
Disconnect(true, true, true, false); | |
return; | |
} | |
public void Disconnect(bool CloseConnection, bool CloseChannel, bool CloseConsumer, bool CloseQueue) | |
{ | |
if (null != _Consumer) | |
{ | |
if (CloseQueue) | |
{ | |
_Consumer.Queue.Close(); | |
} | |
if (CloseConsumer) | |
{ | |
_Consumer = null; | |
} | |
} | |
if (CloseChannel) | |
{ | |
if ((null != _Channel) && _Channel.IsOpen) | |
{ | |
_Channel.Close(); | |
} | |
_QueueDeclareReturn = null; | |
_Channel = null; | |
} | |
if (CloseConnection) | |
{ | |
if (null != _Connection) | |
{ | |
if (_Connection.IsOpen) | |
{ | |
_Connection.Close(); | |
} | |
_Connection = null; | |
if (null != _ConnectionFactory) | |
{ | |
_ConnectionFactory = null; | |
} | |
} | |
} | |
return; | |
} | |
public IModel CreateChannel() | |
{ | |
if (null == _Channel) | |
{ | |
Connect(); | |
_Channel = _Connection.CreateModel(); | |
_Connection.AutoClose = _ConnectionAutoClose; | |
} | |
return _Channel; | |
} | |
public QueueDeclareOk CreateQueue(string QueueName) | |
{ | |
if (null == _QueueDeclareReturn || !_QueueDeclareReturn.QueueName.Equals(QueueName, StringComparison.CurrentCultureIgnoreCase)) | |
{ | |
_QueueName = QueueName; | |
CreateChannel(); | |
_QueueDeclareReturn = _Channel.QueueDeclare(_QueueName, _Durable, _Exclusive, _AutoDelete, _QueueArguments); | |
_Channel.BasicQos(0, 1, false); | |
} | |
return _QueueDeclareReturn; | |
} | |
public string Receive() | |
{ | |
return Receive(_QueueName, _WaitMilliSeconds); | |
} | |
public string Receive(string QueueName) | |
{ | |
return Receive(QueueName, _WaitMilliSeconds); | |
} | |
public string Receive(int? WaitMilliSeconds) | |
{ | |
return Receive(_QueueName, WaitMilliSeconds); | |
} | |
public string Receive(string QueueName, int? WaitMilliSeconds) | |
{ | |
//var fReturn = false; | |
var __QueueName = QueueName ?? _QueueName; | |
var __WaitMilliSeconds = WaitMilliSeconds ?? _WaitMilliSeconds; | |
string message = null; | |
try { | |
CreateQueue(_QueueName); | |
if (null == _Consumer) | |
{ | |
_Consumer = new QueueingBasicConsumer(_Channel); | |
_Channel.BasicConsume(__QueueName, false, _Consumer); | |
} | |
BasicDeliverEventArgs ea; | |
bool fReturn = _Consumer.Queue.Dequeue((int)__WaitMilliSeconds, out ea); | |
if (fReturn) | |
{ | |
var body = ea.Body; | |
message = Encoding.UTF8.GetString(body); | |
_Channel.BasicAck(ea.DeliveryTag, false); | |
} | |
} | |
catch (Exception ex) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace)); | |
if (null != ex.InnerException) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace)); | |
} | |
throw; | |
} | |
return message; | |
} | |
public bool Send(string message) | |
{ | |
return Send(_QueueName, message); | |
} | |
public bool Send(string QueueName, string message) | |
{ | |
var fReturn = false; | |
try | |
{ | |
CreateQueue(_QueueName); | |
var body = Encoding.UTF8.GetBytes(message); | |
var properties = _Channel.CreateBasicProperties(); | |
properties.SetPersistent(true); | |
_Channel.BasicPublish(_ExchangeName, _QueueName, properties, body); | |
fReturn = true; | |
} | |
catch (Exception ex) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace)); | |
if (null != ex.InnerException) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace)); | |
} | |
throw; | |
} | |
return fReturn; | |
} | |
public Client() | |
{ | |
} | |
public Client(string Server, string Username, string Password) | |
{ | |
_Server = Server; | |
_Username = Username; | |
_Password = Password; | |
} | |
public Client(string VirtualHost, string Server, string Username, string Password) | |
{ | |
_VirtualHost = VirtualHost; | |
_Server = Server; | |
_Username = Username; | |
_Password = Password; | |
} | |
public Client(string VirtualHost, string Server, int Port, string Username, string Password) | |
{ | |
_VirtualHost = VirtualHost; | |
_Server = Server; | |
_Port = Port; | |
_Username = Username; | |
_Password = Password; | |
} | |
~Client() | |
{ | |
Dispose(false); | |
} | |
public void Dispose() | |
{ | |
Dispose(true); | |
GC.SuppressFinalize(this); | |
} | |
protected void Dispose(bool disposing) | |
{ | |
try | |
{ | |
if (disposed) | |
return; | |
if (disposing) | |
{ | |
// Free any other managed objects here. | |
// | |
} | |
// Free any unmanaged objects here. | |
// | |
if (null != _Channel && _Channel.IsOpen) | |
{ | |
} | |
_Channel = null; | |
if (null != _Connection && _Connection.IsOpen) | |
{ | |
} | |
_Connection = null; | |
if (null != _Consumer) | |
{ | |
} | |
_Consumer = null; | |
Disconnect(); | |
disposed = true; | |
} | |
catch (Exception ex) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace)); | |
if (null != ex.InnerException) | |
{ | |
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace)); | |
} | |
throw; | |
} | |
} | |
#endregion | |
} | |
} |
In case your browser does not render the Gist correctly you can view the code directly at: https://gist.github.com/dfch/6da3d17414c913595302.
Usage is simple, you just add the assembly to your session (note: the assembly must be in the same path as the original RabbitMQ client dll) and then you specify server, username, password and your queue.
Receiving a message is nearly the same; call “Rceive()” and optionally specify the wait timeout:
There was a strange behavior I observed after connecting a consumer and receving from a queue. On the first fetch/dequeue operation with a WaitTimeout of 0 no message was returned. On a subsequent fetch the message was received. When using an infinite WaitTimeout (-1) the message was received on the first call.
In case you get an error while trying to connect to your AMQP server have a look at the ErrorRecord, especially the second InnerException of the Exception:
PS > $mq.Send("tralala"); Exception calling "Send" with "1" argument(s): "None of the specified endpoints were reachable" At line:1 char:1 + $mq.Send("tralala"); + ~~~~~~~~~~~~~~~~~~~ + CategoryInfo : NotSpecified: (:) [], MethodInvocationException + FullyQualifiedErrorId : BrokerUnreachableException PS > $error[0].Exception Exception calling "Send" with "1" argument(s): "None of the specified endpoints were reachable" PS > $error[0].Exception.InnerException None of the specified endpoints were reachable PS > $error[0].Exception.InnerException.InnerException ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
The broker log file (which you will find below the %APPDATA% folder) will sometimes give you more details, such as telling you about lacking permissions or wrong credentials:
=INFO REPORT==== 24-Aug-2014::01:12:59 === accepting AMQP connection <0.281.0> (amqpclient.example.com:54082 -> amqp.example.com:5672) =ERROR REPORT==== 24-Aug-2014::01:13:02 === closing AMQP connection <0.281.0> (amqpclient.example.com:54082 -> amqp.example.com:5672): {handshake_error,starting,0, {amqp_error,access_refused, "PLAIN login refused: user 'guest' can only connect via localhost", 'connection.start_ok'}} =INFO REPORT==== 24-Aug-2014::01:13:21 === accepting AMQP connection <0.286.0> (amqpclient.example.com:54083 -> amqp.example.com:5672) =ERROR REPORT==== 24-Aug-2014::01:13:24 === closing AMQP connection <0.286.0> (amqpclient.example.com:54083 -> amqp.example.com:5672): {handshake_error,starting,0, {amqp_error,access_refused, "PLAIN login refused: user 'Edgar.Schnittenfittich' - invalid credentials", 'connection.start_ok'}}
Note: if you have a fresh installation of RabbitMQ you might want to add a user so you can connect remotely to your server:
Create new user so we can connect from remote C:\> rabbitmqctl.bat add_user Administrator P@ssL0rd Make this user read/write admin C:\> rabbitmqctl.bat set_user_tags Administrator administrator C:\> rabbitmqctl.bat set_permissions -p / Administrator .* .* .* C:\> rabbitmqctl.bat list_users Listing users ... Administrator [administrator] guest [administrator] ...done. C:\> rabbitmqctl.bat list_permissions Listing permissions in vhost "/" ... Administrator .* .* .* guest .* .* .* ...done.
And it is always a good idea to watch your connections, consumers, queues and the like:
C:\> rabbitmqctl.bat list_connections Listing connections ... Administrator 192.168.174.148 53825 running Administrator 192.168.174.148 53828 running ...done. C:\> rabbitmqctl.bat list_queues Listing queues ... default 0 q2 0 q3 1786889 task_queue 0 ...done. C:\> rabbitmqctl.bat list_consumers Listing consumers ... q3 <rabbit@amqpserver.1.1609.0> amq.ctag-K1U5l7foM-7RDhxfcgOwTw true 1 [] q3 <rabbit@amqpserver.1.1665.0> amq.ctag-fB8dS9ZeJt9jNDP4EbCuBw true 1 [] ...done.
Reblogged this on Dinesh Ram Kali..
Goodd reading