#IoT2CRM: Time to get connected – part 4: Dispatcher completes the gateway

In this series of articles I’m writing about my experiences in connecting IoT devices to Dynamics CRM. In the previous article, I implemented the Web API controller within the gateway which enabled collecting and persisting the messages the IoT devices are sending.
The Web API Controller is only half of the gateway. In this article I describe the dispatching half of the gateway, enabling the gateway to send the persisted messages to CRM.

IoT2CRM_Gateway_dispatcher

The princples behind the dispatcher are pretty straight forward. I’ll describe the steps below:

  1. Monitor a folder on the presence of files
  2. Read a file, transform it into an entity
  3. Send the entity to CRM
  4. Delete the file processed
  5. Go to step 1
  6. Time out for a while and repeat

The six steps described above are the engine behind the dispatcher. What I added is a form of parallelism, in which multiple threads are collecting the files and sending the entities to CRM.
The Microsoft.Net 4.5 framework introduced task parallelism (Task Parallelism Library TPL), using this library you can easily create parallel task which are executed in different threads without having to worry about threading (which is the fun part  ).

The advantage of using parallel tasks, is that the cpu is being utilited much more effective. As our dispatcher is pretty straight forward I only used two tasks (one task per folder to monitor). In much more advanced scenarios you could implement different levels of buffering to create one big efficient pipeline.

At it’s core the dispatcher is very simple, I reduced the code to it’s core to make the code easier to comprehend (for production usage, you should extend the code with error handling, tracing, notifications etc.).
Furthermore I implemented the dispatcher as a simple console application, for productions purposes the dispatcher should be implemented as a windows service / azure job.

The processor

/// <summary>
/// Generic folder processor
/// </summary>
/// <param name="folderName"></param>
private void FolderProcessor(string folderName)
{
// get all files. if we have any... process them
var files = Directory.GetFiles(folderName);
if (files.Length > 0)
{
// set up connection
Microsoft.Xrm.Client.CrmConnection connection = CrmConnection.Parse(_connectionString);

// Obtain an organization service proxy.
// The using statement assures that the service proxy will be properly disposed.
using (var orgService = new OrganizationService(connection))
{
// iterate through files and pump them into CRM
foreach (var file in files)
{
Console.WriteLine("processing file: {0}", file);

// create a message, and save it's entity to CRM
var msg = new Message(File.ReadAllLines(file));
orgService.Create(msg.ToEntity());

// kill the file
File.Delete(file);
}
}
}
}

As you can see the processor scans the passed in folder for files. If there are any files, the files are read and send to CRM. Once sent, the file is deleted from the file system. No rocket science here.

Message Class

/// <summary>
/// Transform message into CRM entity
/// </summary>
/// <returns>Entity representation of object</returns>
public Entity ToEntity()
{
Entity e = new Entity("new_iotmessage");
e["new_deviceid"]=DeviceId;
e["new_devicetype"]=DeviceType;
e["new_errorcode"]=ErrorCode;
e["new_devicedata"]=Data;
e["new_receivedate"]=CreateDate;

return e;
}

In the message class I implemented a ToEntity method. In this method I transform the message into a generic CRM entity. This function needs to be adapted to your specific situation.

Main loop

public void ProcessMessages()
{

if (!Directory.Exists(_storagePathError))
{
Console.WriteLine("The folder '{0}' does not exist! Terminating...", _storagePathError);
return;
}

if (!Directory.Exists(_storagePathUsage))
{
Console.WriteLine("The folder '{0}' does not exist! Terminating...", _storagePathUsage);
}

// endless loop
while (true)
{
// spawn two parallel processes. One for the errors and one for the usage data
Task[] taskArray = { Task.Factory.StartNew(() => FolderProcessor(_storagePathError)),
Task.Factory.StartNew(() => FolderProcessor(_storagePathUsage)) };

// wait for a minute
Thread.Sleep(60000);
}
}

The main loop is using the Task Parallel Library in order to spawn two threads. One thread per folder to be monitored. Once the threads completed, the process goes into a state of hibernation and starts all over again.

This simple piece of code wraps up the gateway.

From an architectural point of view the gateway is divided in two processes. One inbound process (in order to collect the messages) and one outbound process (in order to pump the data into CRM). The beauty of this principle is that it is really scalable, robust, lightweight and easy to adapt to a changing infrastructure without having to disrupt the service to the IoT devices out there (as long as the Web API controller stays up and running).

For your convenience I’ve put up the complete source code of the gateway for download. Please, keep in mind that this code is a proof of concept and not production code.

File Attachment: JourneyIntoCRM.IoT2CRM.zip (127 KB)

In the next article, I’ll implement the logic on the IoT devices.

 Iot

Leave a Reply

Your email address will not be published. Required fields are marked *