A couple of days ago I wrote about how to use RabbitMQ with AMQP from PowerShell (via a small assembly that utilized the original C# .NET Client). After I implemented it in the use case for our customer I somehow felt it to be “too heavy” for what I was really using it (compared to all the features AMQP has to offer to you). And as we were already using SignalR in some other client projects I wondered if I could use this as a replacement for AMQP. Before you scream out and complain that SignalR was never meant to be a replacement or even comparable to a much superiour protocol like AMQP – relax. I know. Still – I wanted to have a quick look to see if and how it might fit. So after a quick look the most prominent drawback I had to complain about was that you cannot just “dequeue” an element from a queue with multiple workers waiting for the same queue. SignalR would just send it happily to all of them. Luckily this was not an issue in my scenario as I had a central dispatcher that would wait for messages. On the positive side I noticed it would be perfectly and easily usable via multiple web servers and work across all in-between firewalls as the connection to my web servers were already allowed. And there was a .NET/C# client for it as well available on NuGet Microsoft ASP.NET SignalR .NET Client 2.1.1.
Install-Package Microsoft.AspNet.SignalR.Client
Furthermore I could run it without installing an additional server component, but I could have it side-by-side with my existing SignalR functionality the browser was already using. And really with a few lines of code mostly for convenience, it was readily usable from PowerShell:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using System.Diagnostics; | |
using Microsoft.AspNet.SignalR; | |
using Microsoft.AspNet.SignalR.Client; | |
using System.Collections.Concurrent; | |
using System.Net; | |
namespace SignalRClient | |
{ | |
public class Connection | |
{ | |
private Uri _uri; | |
public Uri Uri | |
{ | |
get { return _uri; } | |
set { _uri = value; } | |
} | |
private string _hubName; | |
public string HubName | |
{ | |
get { return _hubName; } | |
set { _hubName = value; } | |
} | |
public HubConnection _hubConnection; | |
public IHubProxy _hub; | |
public ConcurrentDictionary<string, IDisposable> EventHandler = new ConcurrentDictionary<string, IDisposable>(); | |
public ConcurrentDictionary<string, ConcurrentQueue<string>> EventQueue = new ConcurrentDictionary<string, ConcurrentQueue<string>>(); | |
private void EnQueue(string eventName, string message) | |
{ | |
var fReturn = false; | |
Debug.WriteLine("{0}: EnQueue message '{1}'.", eventName, message); | |
ConcurrentQueue<string> EnQueue; | |
fReturn = EventQueue.TryGetValue(eventName, out EnQueue); | |
if (fReturn) | |
{ | |
EnQueue.Enqueue(message); | |
} | |
} | |
public string TryDequeue(string eventName) | |
{ | |
return Dequeue(eventName, 0); | |
} | |
public string Dequeue(string eventName) | |
{ | |
return Dequeue(eventName, –1); | |
} | |
public string Dequeue(string eventName, int dwMillisecondsTotalWaitTime) | |
{ | |
return Dequeue(eventName, dwMillisecondsTotalWaitTime, 100); | |
} | |
public string Dequeue(string eventName, int dwMillisecondsTotalWaitTime, int dwMilliSecondsWaitIntervall) | |
{ | |
var fReturn = false; | |
string message = string.Empty; | |
IDisposable eventHandler; | |
fReturn = EventHandler.TryGetValue(eventName, out eventHandler); | |
if (!fReturn) | |
{ | |
return null; | |
} | |
ConcurrentQueue<string> eventQueue; | |
fReturn = EventQueue.TryGetValue(eventName, out eventQueue); | |
if (!fReturn) | |
{ | |
return null; | |
} | |
fReturn = eventQueue.TryDequeue(out message); | |
if (fReturn || 0 == dwMillisecondsTotalWaitTime) | |
{ | |
return fReturn ? message : null; | |
} | |
if ((dwMilliSecondsWaitIntervall > dwMillisecondsTotalWaitTime) && (–1 != dwMillisecondsTotalWaitTime)) | |
{ | |
dwMilliSecondsWaitIntervall = dwMillisecondsTotalWaitTime; | |
} | |
var fInfiniteWaitTime = –1 == dwMillisecondsTotalWaitTime ? true : false; | |
var datNow = DateTimeOffset.UtcNow; | |
do | |
{ | |
System.Threading.Thread.Sleep(dwMilliSecondsWaitIntervall); | |
fReturn = eventQueue.TryDequeue(out message); | |
if (fReturn) | |
{ | |
break; | |
} | |
} while (fInfiniteWaitTime || dwMillisecondsTotalWaitTime > (DateTimeOffset.UtcNow – datNow).TotalMilliseconds); | |
return fReturn ? message : null; | |
} | |
public List<string> DequeueAll(string eventName) | |
{ | |
var fReturn = false; | |
var message = string.Empty; | |
var messages = new List<string>(); | |
while(true) | |
{ | |
message = TryDequeue(eventName); | |
if (null != message) | |
{ | |
messages.Add(message); | |
continue; | |
} | |
break; | |
} | |
return messages; | |
} | |
async public Task<bool> Start(string eventName) | |
{ | |
var fReturn = false; | |
if (null == _hub) | |
{ | |
throw new ArgumentException("_hub"); | |
} | |
fReturn = EventHandler.TryAdd(eventName, null); | |
if (!fReturn) | |
{ | |
Debug.WriteLine(string.Format("{0}: Adding EventHandler FAILED. [{1}]", eventName, EventHandler.Count)); | |
return fReturn; | |
} | |
ConcurrentQueue<string> queue = null; | |
fReturn = EventQueue.TryGetValue(eventName, out queue); | |
if(!fReturn) | |
{ | |
queue = new ConcurrentQueue<string>(); | |
fReturn = EventQueue.TryAdd(eventName, queue); | |
if (!fReturn) | |
{ | |
Debug.WriteLine(string.Format("{0}: Adding EventQueue FAILED. [{1}]", eventName, EventQueue.Count)); | |
return fReturn; | |
} | |
} | |
IDisposable eventHandler; | |
eventHandler = _hub.On(eventName, m => | |
{ | |
this.EnQueue(eventName, m); | |
}); | |
if (null == eventHandler) | |
{ | |
Debug.WriteLine(string.Format("{0}: Adding EventHandler FAILED. [{1}]", eventName, EventHandler.Count)); | |
fReturn = Stop(eventName); | |
fReturn = false; | |
return fReturn; | |
} | |
fReturn = EventHandler.TryUpdate(eventName, eventHandler, null); | |
if (!fReturn) | |
{ | |
Debug.WriteLine(string.Format("{0}: Updating EventHandler FAILED. [{1}]", eventName, EventHandler.Count)); | |
fReturn = this.Stop(eventName); | |
fReturn = false; | |
return fReturn; | |
} | |
Debug.WriteLine(string.Format("{0}: Starting _hubConnection … [{1}]", eventName, EventHandler.Count)); | |
await _hubConnection.Start(); | |
fReturn = true; | |
return fReturn; | |
} | |
public void Stop() | |
{ | |
foreach (var eventHandler in EventHandler) | |
{ | |
Stop(eventHandler.Key.ToString()); | |
} | |
} | |
public bool Stop(string eventName) | |
{ | |
IDisposable eventHandler; | |
var fReturn = false; | |
fReturn = EventHandler.TryRemove(eventName, out eventHandler); | |
if (!fReturn || (null == eventHandler)) | |
{ | |
return fReturn; | |
} | |
eventHandler.Dispose(); | |
if (0 >= EventHandler.Count) | |
{ | |
_hubConnection.Stop(); | |
} | |
return fReturn; | |
} | |
public Connection() | |
{ | |
_Connection(_uri, _hubName, CredentialCache.DefaultNetworkCredentials); | |
} | |
public Connection(Uri uri, string hubName) | |
{ | |
_Connection(uri, hubName, CredentialCache.DefaultNetworkCredentials); | |
} | |
public Connection(Uri uri, string hubName, ICredentials Credentials) | |
{ | |
_Connection(uri, hubName, Credentials); | |
} | |
public Connection(Uri uri, string hubName, string Username, string Password) | |
{ | |
var cred = new NetworkCredential(Username, Password); | |
_Connection(uri, hubName, cred); | |
} | |
private void _Connection(Uri uri, string hubName, ICredentials Credentials) | |
{ | |
_uri = uri; | |
if (null == uri) | |
{ | |
throw new ArgumentNullException("uri"); | |
} | |
_hubName = hubName; | |
if (string.IsNullOrWhiteSpace(hubName)) | |
{ | |
throw new ArgumentNullException("hubName"); | |
} | |
if (null == Credentials) | |
{ | |
throw new ArgumentNullException("Credentials"); | |
} | |
_hubConnection = new HubConnection(_uri.AbsoluteUri); | |
_hubConnection.Credentials = CredentialCache.DefaultNetworkCredentials; | |
_hub = _hubConnection.CreateHubProxy(_hubName); | |
} | |
~Connection() | |
{ | |
this.Stop(); | |
_hubConnection.Dispose(); | |
} | |
} | |
} |
In case your browser does not correctly render the Gist code you can view it directly at: https://gist.github.com/dfch/cc5a4b0145509c838477
I made most of the properties public so you can still use the underlying .NET objects. And I did not implement “groups” nor “sending” to the server, but just enough to “receive” messages. (For “sending” messages you can invoke the methods directly on the hub as shown in my next post.) Invocation from PowerShell was then pretty simple:
$eventName = "receiveMessage"; Add-Type -Path ".\SignalRClient.dll" $s = New-Object SignalRClient.Connection("http://localhost/", "commandhub"); $s.Start($eventName); while($true) { $s.TryDequeue($eventName) Start-Sleep -seconds 1 } string TryDequeue(string eventName) string Dequeue(string eventName) string Dequeue(string eventName, int dwMillisecondsTotalWaitTime) string Dequeue(string eventName, int dwMillisecondsTotalWaitTime, int dwMilliSecondsWaitIntervall) System.Collections.Generic.List[string] DequeueAll(string eventName)
Note1: For ease of use there are several dequeueing methods defined. The “dwMilliSecondsWaitIntervall” is only used internally to “poll” the memory queue where all the messages are stored (so there is not server polling). Note2: In this example the current username is used to connect to the SignalR hub. Therefore no credentials are supplied. Using this approach instead of a full blown RabbitMQ installation really reduced the memory footprint of the application and eliminated an HA pair of servers including a separate database and an authentication system that was not Windows based.
1 Comment »