Implementing Aggregator Pattern using Ballerina


Introduction to Aggregator Pattern

Aggregator is one of the basic pattern defined in SOA patterns and EIP patterns that can be used to define more complex scenarios.

According to the EIP patterns “The Aggregator is a special Filter that receives a stream of messages and identifies messages that are correlated. Once a complete set of messages has been received (more on how to decide when a set is 'complete' below), the Aggregator collects information from each correlated message and publishes a single, aggregated message to the output channel for further processing’ [1]

Use Case


Let’s assume a Salesperson wants to get the Customer's Personal Information, Contact Information and the Purchasing Behavior for a given customer ID through the Customer Relationship Management (CRM) system for the upcoming direct marketing campaign. In a real world scenario, the  CRM system needs to call multiple backend services to get the required information and aggregate the responses coming from the backend systems to provide the information requested by the salesperson.

The system will send a request message with the customer ID to retrieve the required information from following systems.
  • Send a request to "Customer Info Service" to get customer's personal information
  • Send a request to "Contact Info Service" to get customer's contact Information
  • Send a request to "Purchasing Behavior Service" to get the purchasing details of the customer

Implementation Description


Following backend services will provide the requested information based on the customer ID provided.
  •     ContactInfo.bal
  •     CustomerInfo.bal
  •     PurchasingInfo.bal
Intermediate service (AggregatorService) will get the responses coming from the backend services and integrate the responses to provide the response to the Salesperson.

Let's Code with Ballerina

First, go to Ballerina website and download the latest ballerina distribution.

Note - I have used Ballerina 0.89 version to demonstrate this use case

Start Ballerina Composer

Ballerina Composer is a visual editor tool that provides the capability to write or draw your integration scenario.

To start the composer, go to <Ballerina_Home>/bin and execute following command depending on your environment.

Linux Environment - ./composer
Windows Environment  - composer.bat

Implementing the backend services

To implement the above use case let's create the required backend services; Customer Info Service, Contact Info Service and Purchasing Behavior Service.

Customer Information Service
I have created a service named “CustomerInfoService” via the composer and provide the base path “/customerInfo” to access this service directly by the outside clients. To demonstrate the scenario, I have created a map to maintain the customer information. The jsonpath is used to extract the customer ID from the incoming request and extract the customer information from the ‘customerInfoMap’ based on the customer ID. If there is no details for the requested ‘CustomerID’, service will return an error payload.

Let’s see how this can be represented using the composer.

 
Following is the code representation of the above design.

package aggregator;

import ballerina.net.http;
import ballerina.lang.messages;
import ballerina.lang.jsons;
import ballerina.lang.system;

@http:config {basePath:"/customerInfo"}
service<http> CustomerInfoService {


    @http:POST{}

    @http:Path{value:"/"}
    resource CustomerInfoResource(message m) {
    json incomingPayload = messages:getJsonPayload(m);
        
    map customerInfoMap = {};
    json cus_1 = {"PersonalDetails": {"Name": "Peter Thomsons","Age": "32","Gender": "Male"}};
    json cus_2 = {"PersonalDetails": {"Name": "Anne Stepson","Age": "50","Gender": "Female"}};
    json cus_3 = {"PersonalDetails": {"Name": "Edward Dewally","Age": "23","Gender": "Male"}};
    customerInfoMap["100"]= cus_1;
    customerInfoMap["101"]= cus_2;
    customerInfoMap["102"]= cus_3;
       
        string customerID = jsons:getString(incomingPayload,"$.Customer.id");
        system:println("Customer ID = " + customerID);
        message response = {};
       
        json payload;
        payload, _ = (json) customerInfoMap[customerID];

        if (payload != null) {
            messages:setJsonPayload(response,payload);
        } else {
            json errorpayload = {"Response": {"Error": "No Details available for the given Customer ID"}};
            messages:setJsonPayload(response, errorpayload);
        }
       
    reply response;
    }
}


  

This service will return the customer Information based on the requested customer ID.

Note - I have created the Contact Info Service and Purchasing Behaviour Service similar to the above service. Only difference is the payload used in the service.

Implementing the Intermediate service

So far we have created the ‘Customer Information Service’, ‘Contact Information Service’ and ‘Purchasing Information Service’ using ballerina. Let’s see how to create an intermediate service to aggregate the responses coming from each of the backend system and provide an aggregated response to the salesperson.


I have created a service named “AggregatorService” to aggregate the backend responses. To implement the scenario I have used the Fork Join function in Ballerina, which has the capability of defining individual workers that will work on an assigned task and wait until all the workers are completed with the assigned task. When the backend responses are collected those will be aggregated to create a JSON payload as diagramed in composer below.



Following is the code representation of the above design.
 package aggregator;

import ballerina.net.http;
import ballerina.lang.messages;
import ballerina.lang.jsons;

@http:config {basePath:"/AggregatorService"}
service<http> AggregatorService {

 @http:POST{}

    @http:Path{value:"/"}
    resource CRMResource(message m) {
    http:ClientConnector customerInfoEP = create http:ClientConnector("http://localhost:9090/customerInfo");
    http:ClientConnector contactInfoEP = create http:ClientConnector("http://localhost:9090/contactInfo");
    http:ClientConnector purchasingInfoEP = create http:ClientConnector("http://localhost:9090/purchasingInfo");
    json incomingPayload = messages:getJsonPayload(m);
    string customerID = jsons:getString(incomingPayload, "$.Customer.id");
    message aggregateResponse = {};

    if (customerID == "100" || customerID == "101" || customerID == "102" ) {
        
        fork {
            worker forkWorker1 {
            message response1 = {};
            message m1 = messages:clone(m);
            response1 = http:ClientConnector.post(customerInfoEP, "/", m1);
            response1 -> fork;
            }
            
            worker forkWorker2 {
            message response2 = {};
        message m2 = messages:clone(m);
            response2 = http:ClientConnector.post(contactInfoEP, "/", m2);
          response2 -> fork;
            }

        worker forkWorker3 {
            message response3 = {};
            response3 = http:ClientConnector.post(purchasingInfoEP, "/", m);
            response3 -> fork;
            }

        } join (all) (map results){
            any[] t1;
            any[] t2;
    any[] t3;
            t1,_ = (any[]) results["forkWorker1"];
            t2,_ = (any[]) results["forkWorker2"];
    t3,_ = (any[]) results["forkWorker3"];
    message res1;
    message res2;
    message res3;
            res1, _  = (message) t1[0];
            res2, _  = (message) t2[0];
    res3, _  = (message) t3[0];
            json jsonres1 = messages:getJsonPayload(res1);
    json jsonres2 = messages:getJsonPayload(res2);
    json jsonres3 = messages:getJsonPayload(res3);

    
    json payload = {};
    payload.CustomerDetailsResponse = {};
    payload.CustomerDetailsResponse.PersonalDetails = jsonres1.PersonalDetails;
    payload.CustomerDetailsResponse.ContactDetails = jsonres2.ContactDetails;
    payload.CustomerDetailsResponse.PurchasingDetails = jsonres3.PurchasingDetails;
     messages:setJsonPayload(aggregateResponse,payload);
    }
 } else {
     json errorpayload = {"Response": {"Error": "No Details available for the given Customer ID"}};

 messages:setJsonPayload(aggregateResponse, errorpayload);
 }

reply aggregateResponse;
        
        
    }
}


Executing the Service


Deploying the Service
Now we have all the backend services and aggregator service created using Ballerina. Let’s see how to deploy and invoke the services.

I have packaged all the backend services and intermediate service under “aggregator” package by defining the “package aggregator;” on top of each service. For the demonstration purpose I have created a ballerina archive named “aggregator.bsz” including all the services in the “aggreagtor” package.

Use following command to create a ballerina archive

<Ballerina_HOME>/bin/ballerina build service <package> -o <FileName.bsz>

Ex: <Ballerina_HOME>/bin/ballerina build service aggregator -o aggregator.bsz

Run the following command to deploy and run the service.

./ballerina run service <BallerinaArchiveName>

Ex: ./ballerina run service aggregator.bsz

Note : Ballerina Archive for the above use case can be found from [2]


Invoking the Service

Now the Salesperson can get all the expected information (personal details, contact details and purchasing behavior information) required for the direct marketing campaign by providing the CustomerID to the CRM system.

Here, I have used “Postman” Rest Client to represent the CRM system and requesting the information for the CustomerID = “101”.

References 

  1. http://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html
  2. https://github.com/sashikamw/BallerinaHackathon

Comments

Post a Comment

Popular posts from this blog

Use ZAP tool to intercept HTTP Traffic

Working with WSO2 carbon Admin Services

The 100% Open Source ESB