You can download latest ESB version from http://wso2.com/products/enterprise-service-bus/
Kafka Inbound Use Cases
- ESB Kafka Inbound as Queue
- ESB Kafka Inbound as Topic
- ESB Kafka Inbound consumes from beginning
- ESB Kafka Inbound consumes from multiple topics
- ESB Kafka Inbound consume from specific server and topic partition
USE CASE 1 : WSO2 ESB Kafka Inbound as Queue
If the consumer instances are in a same consumer group, then this works like traditional queue. So only one Kafka inbound will consume the message.
Consider the following Kafka inbound configurations (KafkaInboundEP1 and KafkaInboundEP2). Since the group.id parameter values in both inbound configurations are same, both are in a same consumer group. In such case, any one of these endpoints will consumes the message from the topic.
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="KafkaInboundEP1"
sequence="requestHandlerSeq"
onError="inFaulte"
protocol="kafka"
suspend="false">
<parameters>
<parameter name="interval">100</parameter>
<parameter name="coordination">true</parameter>
<parameter name="sequential">true</parameter>
<parameter name="zookeeper.connect">localhost:2181</parameter>
<parameter name="consumer.type">highlevel</parameter>
<parameter name="content.type">application/xml</parameter>
<parameter name="topics">topic</parameter>
<parameter name="group.id">consumer-group</parameter>
</parameters>
</inboundEndpoint>
KafkaInboundEP2
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="KafkaInboundEP2"
sequence="requestHandlerSeq"
onError="inFaulte"
protocol="kafka"
suspend="false">
<parameters>
<parameter name="interval">100</parameter>
<parameter name="coordination">true</parameter>
<parameter name="sequential">true</parameter>
<parameter name="zookeeper.connect">localhost:2181</parameter>
<parameter name="consumer.type">highlevel</parameter>
<parameter name="content.type">application/xml</parameter>
<parameter name="topics">topic</parameter>
<parameter name="group.id">consumer-group</parameter>
</parameters>
</inboundEndpoint>
USE CASE 2 : WSO2 ESB Kafka Inbound as Topic
KafkaInboundEP1
If the consumer instances are in different consumer groups, then this works like publish-subscribe and all messages will be broadcasted to all consumers.
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="KafkaInboundEP2"
sequence="requestHandlerSeq"
onError="inFaulte"
protocol="kafka"
suspend="false">
<parameters>
<parameter name="interval">100</parameter>
<parameter name="coordination">true</parameter>
<parameter name="sequential">true</parameter>
<parameter name="zookeeper.connect">localhost:2181</parameter>
<parameter name="consumer.type">highlevel</parameter>
<parameter name="content.type">application/xml</parameter>
<parameter name="topics">topic1</parameter>
<parameter name="group.id">test-group1</parameter>
</parameters>
</inboundEndpoint>
KafkaInboundEP2
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="KafkaInboundEP2"
sequence="requestHandlerSeq"
onError="inFaulte"
protocol="kafka"
suspend="false">
<parameters>
<parameter name="interval">100</parameter>
<parameter name="coordination">true</parameter>
<parameter name="sequential">true</parameter>
<parameter name="zookeeper.connect">localhost:2181</parameter>
<parameter name="consumer.type">highlevel</parameter>
<parameter name="content.type">application/xml</parameter>
<parameter name="topics">test</parameter>
<parameter name="group.id">test-group2</parameter>
</parameters>
</inboundEndpoint>
USE CASE 3 : WSO2 ESB Kafka inbound as message consumer from beginning
The Kafka inbound endpoint allows to consume the messages from beginning.
The following configuration can be used for this use case. KafkaInboundEP1 is used to consume the message from specific topic partition 1 from server1 and KafkaaInboundEP2 is used to consume the message from specific topic partition 2 from server1. So we can consume all messages from specific server from beginning.
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="KafkaInboundEP1"
sequence="requestHandlerSeq"
onError="inFaulte"
protocol="kafka"
interval="1000"
suspend="false">
<parameters>
<parameter name="zookeeper.connect">localhost:2181</parameter>
<parameter name="group.id">test-group</parameter>
<parameter name="content.type">application/xml</parameter>
<parameter name="consumer.type">simple</parameter>
<parameter name="simple.max.messages.to.read">5</parameter>
<parameter name="simple.topic">topic</parameter>
<parameter name="simple.brokers">localhost</parameter>
<parameter name="simple.port">9092</parameter>
<parameter name="simple.partition">1</parameter>
<parameter name="interval">100000</parameter>
</parameters>
</inboundEndpoint>
KafkaInboundEP2
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="KafkaInboundEP2"
sequence="requestHandlerSeq"
onError="inFaulte"
protocol="kafka"
interval="1000"
suspend="false">
<parameters>
<parameter name="zookeeper.connect">localhost:2181</parameter>
<parameter name="group.id">test-group</parameter>
<parameter name="content.type">application/xml</parameter>
<parameter name="consumer.type">simple</parameter>
<parameter name="simple.max.messages.to.read">100000</parameter>
<parameter name="simple.topic">topic</parameter>
<parameter name="simple.brokers">localhost</parameter>
<parameter name="simple.port">9092</parameter>
<parameter name="simple.partition">2</parameter>
<parameter name="interval">1000</parameter>
</parameters>
</inboundEndpoint>
USE CASE 4 : A WSO2 ESB Kafka Inbound consume from more than one topic
A Kafka inbound can consume the messages from more than one topics.
A Kafka inbound can consume the messages from more than one topics.
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="KafkaInboundEP1"
sequence="requestHandlerSeq"
onError="inFaulte"
protocol="kafka"
suspend="false">
<parameters>
<parameter name="interval">100</parameter>
<parameter name="coordination">true</parameter>
<parameter name="sequential">true</parameter>
<parameter name="zookeeper.connect">localhost:2181</parameter>
<parameter name="consumer.type">highlevel</parameter>
<parameter name="content.type">application/xml</parameter>
<parameter name="topics">topic1,topic2</parameter>
<parameter name="group.id">test-group</parameter>
</parameters>
</inboundEndpoint>
USE CASE 5 : WSO2 ESB Kafka Inbound as consume the message from specific server and specific partition
In the following configuration, The messages are consumed from Kafka server localhost:9092 and topic partition 1.
KafkaInboundEP
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="KafkaInboundEP2"
sequence="requestHandlerSeq"
onError="inFaulte"
protocol="kafka"
interval="1000"
suspend="false">
<parameters>
<parameter name="zookeeper.connect">localhost:2181</parameter>
<parameter name="group.id">test-group</parameter>
<parameter name="content.type">application/xml</parameter>
<parameter name="consumer.type">simple</parameter>
<parameter name="simple.max.messages.to.read">100000</parameter>
<parameter name="simple.topic">topic</parameter>
<parameter name="simple.brokers">localhost</parameter>
<parameter name="simple.port">9092</parameter>
<parameter name="simple.partition">1</parameter>
<parameter name="interval">1000</parameter>
</parameters>
</inboundEndpoint>
+1
ReplyDelete