#Crm2Crm – part 6: The Message Pump

In this series of articles, I’m implementing a proof of concept of a replication mechanism within Dynamics CRM. My intention is not to build an enterprise class replication mechanism (e.g. Scribe, KingswaySoft), instead I want to learn more about the mechanisms involved in replication.

2_action_mechanical_sequencer_v3_pic_1

The previous article was a technical necessity as I needed to find a way to serialize and deserialize data within Dynamics CRM. The serialization of the source entity is done inside a plugin that is running inside a sandbox. The sandbox limits us a little bit, therefor we cannot use the standard serialization methods within the .Net framework.

In this article I implemented a first version of the message pump, which can be considered as the engine that will make the actual replication to happen. For the sake of simplicity I implemented the message pump as a console application. In a production like situation I would implement the message pump as a windows Service or Azure service.

The reason why I implement the message pump as a seperate service, is that I don’t want to be dependent on limitations the CRM services are offering me. Besides this reason, it makes the replication more robust as both the source environment and the destination environment can be offline without any changes being lost.

In short I’ll fresh up your knowledge on the replication mechanics I described in earlier articles.

Replication2

The idea behind the replication, is that every action on an entity results in a message that is written by a plugin to a special message entity. In the message entity the data to be replicated is collected together with an action that should be executed on the destination.

The message pump reads a batch of messages from the message entity and starts processing these in the order in which the messages arrived. This results in an exact copy of the source entity.

In the code that I wrote, I implemented a main loop (ProcessMessages). In the main loop, I read a batch of records from the Message Entity. Per record found, I process the contents.

public void ProcessMessages()
        {
            var conn = new CrmServiceClient(SourceConnection);
            var service = (IOrganizationService)conn.OrganizationWebProxyClient != null ? 
                          (IOrganizationService)conn.OrganizationWebProxyClient : (IOrganizationService)conn.OrganizationServiceProxy;
 
            // build a query to get the messages
            var query = new QueryExpression("jcrm_messageentity")
            {
                ColumnSet = new ColumnSet("jcrm_messageentityid", "jcrm_name", "jcrm_sourceentity", "jcrm_sourceentityid", "jcrm_sourceentitydata")
            };
            query.Orders.Add(new OrderExpression("modifiedon", OrderType.Ascending));
            query.Criteria.AddCondition("modifiedon", ConditionOperator.LessEqual, DateTime.Now);
 
            var result = service.RetrieveMultiple(query);
 
            foreach (var entity in result.Entities)
            {
                ProcessMessage(entity, service);
            }
        }

In the ProcessMessage function, the actual work is being done. The following steps are done per message:

  • MapEntityName()
    Get the name of the destination entity. This way we can determine how the entities should be mapped.
  • MapEntityFields()
    Get a list of the attributes within the destination entity. We need this list to determine later on if the destination entity has the same attributes as the source entity.
  • Get the message name
    What is the action that we need to do?
  • Do the action
    Handle delete, set state or upsert (combination of insert/update)
  • Remove the message entity
    Once the message entity is processed then we need to dispose it.
private void ProcessMessage(Entity entity, IOrganizationService sourceService)
        {
            var conn = new CrmServiceClient(DestinationConnection);
            var destService = (IOrganizationService)conn.OrganizationWebProxyClient != null ? 
                              (IOrganizationService)conn.OrganizationWebProxyClient : (IOrganizationService)conn.OrganizationServiceProxy;
 
            // get the mappings 
            var destinationEntityName = MapEntityName(entity);
            var destinationFields = MapEntityFields(destinationEntityName, destService);
 
            var message = entity.GetAttributeValue<string>("jcrm_name");
            var sourceId = new Guid(entity.GetAttributeValue<string>("jcrm_sourceentityid"));
 
            if (message != "Delete")
            {
                var sourceXml = entity.GetAttributeValue<string>("jcrm_sourceentitydata");
                if (destinationFields.Any() && !string.IsNullOrWhiteSpace(sourceXml))
                {
                    // deserialize the message data
                    var sourceEntity = EntitySerializer.DeserializeObject(sourceXml, sourceId);
 
                    // do a setstate or do an upsert (for both insert / update)
                    if (message.Contains("SetState"))
                    {
                        var setStateRequest = new SetStateRequest()
                        {
                            EntityMoniker = new EntityReference(destinationEntityName,sourceId),
                            State = (OptionSetValue)sourceEntity["statecode"],
                            Status = (OptionSetValue)sourceEntity["statuscode"]
                        };
                        destService.Execute(setStateRequest);
                    }
                    else
                    {
                        var upsertRequest = new UpsertRequest()
                        {
                            Target = GetUpsertEntity(sourceId, sourceEntity, destinationEntityName, destinationFields)
                        };
                        destService.Execute(upsertRequest);
                    }
                }
            }
            else
            {
                // delete in the destination the record with the source id
                destService.Delete(destinationEntityName, sourceId);
            }
 
            // remove the message entity
            sourceService.Delete(entity.LogicalName, entity.Id);
        }

Once implemented, it was time to give the message pump its first run. After fixing a couple of small bugs, it ran like a charm.

Time to see the message pump in action.

Pump1

In the data entity (source) I added a new record. Once added, a record showed up in the message entity (see red circle). The message pump picked up the create message and created a new destination entity.

Pump2

In the data entity (source) I updated the record with a new decimal value. Once updated, the message pump picked up the update message and changed the destination entity (see red circles).

Pump3

In the data entity (source) I deactivate the record. Once deactivated, a record showed up in the message entity (see red circles). The message pump picked up the SetState message and deactivated the destination entity.

Pump4

Finally I deleted the source entity, the message pump picked up the delete message and deleted the destination entity as well (see red circles).

One way replication seems to be up and running. Right now I need to do a couple of small steps to make the mechanism bidirectional. This will be done in the next and final article in this series.

For now, I wish you all the best for 2017!

Leave a Reply

Your email address will not be published. Required fields are marked *