Sunday, September 20, 2015

WSO2 ESB 4.9.0 - Kafka Support


The WSO2 ESB Kafka Inbound consumes the message from the Kafka brokers. It allows the message consuming at different speed(polling interval), tenant loading and coordination supports.

You can download latest ESB version from http://wso2.com/products/enterprise-service-bus/


Kafka Inbound Use Cases
  1. ESB Kafka Inbound as Queue
  2. ESB Kafka Inbound as Topic
  3. ESB Kafka Inbound consumes from beginning
  4. ESB Kafka Inbound consumes from multiple topics
  5. ESB Kafka Inbound consume from specific server and topic partition

USE CASE 1 : WSO2 ESB Kafka Inbound as Queue
If the consumer instances are in a same consumer group, then this works like traditional queue. So only one Kafka inbound will consume the message.



Consider the following Kafka inbound configurations (KafkaInboundEP1 and KafkaInboundEP2). Since the group.id parameter values in both inbound configurations are same, both are in a same consumer group. In such case, any one of these endpoints will consumes the message from the topic.
  
KafkaInboundEP1
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                name="KafkaInboundEP1"
                sequence="requestHandlerSeq"
                onError="inFaulte"
                protocol="kafka"
                suspend="false">
  <parameters>
     <parameter name="interval">100</parameter>
     <parameter name="coordination">true</parameter>
     <parameter name="sequential">true</parameter>
     <parameter name="zookeeper.connect">localhost:2181</parameter>
     <parameter name="consumer.type">highlevel</parameter>
     <parameter name="content.type">application/xml</parameter>
     <parameter name="topics">topic</parameter>
     <parameter name="group.id">consumer-group</parameter>
  </parameters>
</inboundEndpoint>

KafkaInboundEP2
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                name="KafkaInboundEP2"
                sequence="requestHandlerSeq"
                onError="inFaulte"
                protocol="kafka"
                suspend="false">
  <parameters>
     <parameter name="interval">100</parameter>
     <parameter name="coordination">true</parameter>
     <parameter name="sequential">true</parameter>
     <parameter name="zookeeper.connect">localhost:2181</parameter>
     <parameter name="consumer.type">highlevel</parameter>
     <parameter name="content.type">application/xml</parameter>
     <parameter name="topics">topic</parameter>
     <parameter name="group.id">consumer-group</parameter>
  </parameters>
</inboundEndpoint>

USE CASE 2 : WSO2 ESB Kafka Inbound as Topic
If the consumer instances are in different consumer groups, then this works like publish-subscribe and all messages will be broadcasted to all consumers.



In the following Kafka inbound configurations,  both Kafka inbound endpoints (KafkaListenerEP and KafkaListenerEP2) are in different consumer groups since the group.id parameter values are different. So both inbound endpoint can consume the messages from the topic1.

KafkaInboundEP1
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                name="KafkaInboundEP2"
                sequence="requestHandlerSeq"
                onError="inFaulte"
                protocol="kafka"
                suspend="false">
  <parameters>
     <parameter name="interval">100</parameter>
     <parameter name="coordination">true</parameter>
     <parameter name="sequential">true</parameter>
     <parameter name="zookeeper.connect">localhost:2181</parameter>
     <parameter name="consumer.type">highlevel</parameter>
     <parameter name="content.type">application/xml</parameter>
     <parameter name="topics">topic1</parameter>
     <parameter name="group.id">test-group1</parameter>
  </parameters>
</inboundEndpoint>

KafkaInboundEP2
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                name="KafkaInboundEP2"
                sequence="requestHandlerSeq"
                onError="inFaulte"
                protocol="kafka"
                suspend="false">
  <parameters>
     <parameter name="interval">100</parameter>
     <parameter name="coordination">true</parameter>
     <parameter name="sequential">true</parameter>
     <parameter name="zookeeper.connect">localhost:2181</parameter>
     <parameter name="consumer.type">highlevel</parameter>
     <parameter name="content.type">application/xml</parameter>
     <parameter name="topics">test</parameter>
     <parameter name="group.id">test-group2</parameter>
  </parameters>
</inboundEndpoint>

USE CASE 3 : WSO2 ESB Kafka inbound as message consumer from beginning

The Kafka inbound endpoint allows to consume the messages from beginning. 

The following configuration can be used for this use case. KafkaInboundEP1 is used to consume the message from specific topic partition 1 from server1 and KafkaaInboundEP2 is used to consume the message from specific topic partition 2 from server1. So we can consume  all messages from specific server from beginning.

KafkaInboundEP1
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                name="KafkaInboundEP1"
                sequence="requestHandlerSeq"
                onError="inFaulte"
                protocol="kafka"
                interval="1000"
                suspend="false">
  <parameters>   
     <parameter name="zookeeper.connect">localhost:2181</parameter>
     <parameter name="group.id">test-group</parameter>  
     <parameter name="content.type">application/xml</parameter>
     <parameter name="consumer.type">simple</parameter>
     <parameter name="simple.max.messages.to.read">5</parameter>
     <parameter name="simple.topic">topic</parameter>
     <parameter name="simple.brokers">localhost</parameter>
     <parameter name="simple.port">9092</parameter>
     <parameter name="simple.partition">1</parameter>
     <parameter name="interval">100000</parameter>
  </parameters>
</inboundEndpoint>

KafkaInboundEP2
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                name="KafkaInboundEP2"
                sequence="requestHandlerSeq"
                onError="inFaulte"
                protocol="kafka"
                interval="1000"
                suspend="false">
  <parameters>   
     <parameter name="zookeeper.connect">localhost:2181</parameter>
     <parameter name="group.id">test-group</parameter>  
     <parameter name="content.type">application/xml</parameter>
     <parameter name="consumer.type">simple</parameter>
     <parameter name="simple.max.messages.to.read">100000</parameter>
     <parameter name="simple.topic">topic</parameter>
     <parameter name="simple.brokers">localhost</parameter>
     <parameter name="simple.port">9092</parameter>
     <parameter name="simple.partition">2</parameter>
     <parameter name="interval">1000</parameter>
  </parameters>
</inboundEndpoint>

USE CASE 4 : A WSO2 ESB Kafka Inbound consume from more than one topic

A Kafka inbound can consume the messages from more than one topics.

Each topics are added using comma separator in the Kafka inbound endpoint configuration. KafkaInboundEP1 can consume the messages from topic1 and topic2.

KafkaInboundEP1
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                name="KafkaInboundEP1"
                sequence="requestHandlerSeq"
                onError="inFaulte"
                protocol="kafka"
                suspend="false">
  <parameters>
     <parameter name="interval">100</parameter>
     <parameter name="coordination">true</parameter>
     <parameter name="sequential">true</parameter>
     <parameter name="zookeeper.connect">localhost:2181</parameter>
     <parameter name="consumer.type">highlevel</parameter>
     <parameter name="content.type">application/xml</parameter>
     <parameter name="topics">topic1,topic2</parameter>
     <parameter name="group.id">test-group</parameter>
  </parameters>
</inboundEndpoint>

USE CASE 5 : WSO2 ESB Kafka Inbound as consume the message from specific server and specific partition

The Kafka Inbound allows to consume the messages from specific Kafka server and specific topic partition.

In the following configuration, The messages are consumed from Kafka server localhost:9092 and topic partition 1.

KafkaInboundEP
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                name="KafkaInboundEP2"
                sequence="requestHandlerSeq"
                onError="inFaulte"
                protocol="kafka"
                interval="1000"
                suspend="false">
  <parameters>   
     <parameter name="zookeeper.connect">localhost:2181</parameter>
     <parameter name="group.id">test-group</parameter>  
     <parameter name="content.type">application/xml</parameter>
     <parameter name="consumer.type">simple</parameter>
     <parameter name="simple.max.messages.to.read">100000</parameter>
     <parameter name="simple.topic">topic</parameter>
     <parameter name="simple.brokers">localhost</parameter>
     <parameter name="simple.port">9092</parameter>
     <parameter name="simple.partition">1</parameter>
     <parameter name="interval">1000</parameter>
  </parameters>
</inboundEndpoint>

1 comment:

Create a REST API with Spring Boot

In this post, I will explain how to create a simple a REST API with Spring Boot Spring Boot Spring Boot is a framework that provides inbuil...