Emitting Process events to Kafka for Analytics using Red Hat Process Automation Manager

Red Hat AMQ Streams, based on Apache Kafka, is a streaming platform. Since the 7.10 release of Red Hat Process Automation Manager, users have access to out-of-box Kafka integration. Easily integrate the business processes with Kafka, for emitting or consuming events. Earlier this month, I wrote an article o.n how to create a business process using Red Hat Process Automation Manager and Red Hat AMQ Streams on OpenShift. In this article, we will see how you can configure the KIE Server to emit Kafka messages about every event when a process, case, or task is completed. The KIE Server sends the messages when it commits transactions.

These events can then be pushed to an analytics engine for visualizing the process metrics.

Step 1: Deploy the AMQ Streams operator

The Operator Hub is a collection of Operators from the Kubernetes community and Red Hat partners, curated by Red Hat. Let us first create a install the AMQ Streams operator.

We will install it in a namespace we have created (rhpam-monitoring in the example).

Now that the operator is installed, we can now create a simple 3 node Kafka cluster. For this click on the Kafka tab and click on the Create Kafka button.

We will accept the defaults and create.

Step 2: Setup Business Automation Operator

Next we will first setup the business automation operator. Search for the Business Automation Operator from the operator hub.

The operator allows you to create a Red Hat Process Automation manager environment with the authoring and deployment capabilities. Once the operator is deployed, switch over to the KieApp tab and click on Create KieApp. In this wizard, click on the Objects section and open up Servers. Here we will create a Kieserver definition with the environment properties for emitting the process server events.

Let us now define the Bootstrap Servers as my-cluster-kafka-brokers:9092. By default, the KIE Server publishes the messages in the following topics:

  • jbpm-processes-events
  • jbpm-tasks-events
  • jbpm-cases-events

It is possible to modify these topic names. Let us define the Processes Topic Name as rhpam-processes and the Tasks Topic Name as rhpam-tasks. For a complete list of the properties and its configuration, check out the documentation here. This configuration is enough to push Process metrics to Kafka. You do not need to change anything in the process design. The configuration can be deployed using a yaml definition as well.

Step 3: Deploy a business process and create an instance

Let us now open Business Central. For this lookup the route definition from the namespace. Login in to Business Central with the username/password as defined in the Business Central configuration.

Import the project by clicking on the Import Project.

We will build and deploy the changes. Now we will create a process instance, with the following values.

You can see that the process instance is created.

Step 4: Push Kafka messages to a analytics tool

Now that we have configured the Kafka broker, the process metrics should have been emitted to the corresponding topics.

To verify this, let us deploy an open source Kafka UI ( Kafdrop) to check out the events. For deploying this, we can use the following command.

oc apply -f https://raw.githubusercontent.com/snandakumar87/transaction-monitoring-dmn-kogito/master/ocp-deploy/kafdrop.yml

The yaml definition for installing Kafdrop can be found here.

Once the deployment is completed, we can access the route definition to open up the kafka UI.

You should now see the two topics created for Process and Task events.

Notice how the messages in these topics are in the CloudEvents format.

Once the metrics data is available, it is possible to push to have multiple downstream consumers reading from these topics. Let us push these metrics to an Elastic cluster.

For this let us deploy the Elastic operator on Openshift. 

We will create an ElasticSearchCluster and an instance of Kibana. Let us accept the defaults to complete the setup. 

Now that the elastic instance is deployed, we will use a simple Camel-K integration class to read data from the kafka topic and push it to elastic.

We have defined two routes, to push the metrics to the elastic cluster. 

from("kafka:" + "rhpam-processes" + "?brokers=" + kafkaBootstrap + "&maxPollRecords="
       + consumerMaxPollRecords + "&seekTo=" + "beginning"
       + "&groupId=" + "process")
       .setHeader(Exchange.HTTP_METHOD, constant("POST"))
       .setHeader("Authorization",constant("Basic XXXXXXX"))
       .setHeader("Content-Type",constant("application/json"))
       .to("https://elasticsearch-sample-es-http:9200/process/process")
       .log("${body}");

from("kafka:" + "rhpam-tasks" + "?brokers=" + kafkaBootstrap + "&maxPollRecords="
               + consumerMaxPollRecords + "&seekTo=" + "beginning"
               + "&groupId=" + "task")
               .setHeader(Exchange.HTTP_METHOD, constant("POST"))
               .setHeader("Authorization",constant("Basic XXXXXX"))
               .setHeader("Content-Type",constant("application/json"))
               .to("https://elasticsearch-sample-es-http:9200/tasks/tasks")
               .log("${body}");

We can now start visualizing the process/task metrics on Kibana.

With this we have a whole set up within OpenShift for event-driven business applications that are focused on business process automation.

0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments