Apache Kafka enables data streams to be stored and processed over a distributed streaming platform. Various interfaces allow data to be written to Kafka clusters, data to be read or imported or exported from third-party systems. Due to its high fault tolerance and scalability, Apache Kafka is suitable for large amounts of data and applications, e.g. in the big data environment.
A configuration file is required in the X4 server installation to use the two adapters (7.4.0-en) Apache Kafka Publisher or (7.4.0-en) Apache Kafka Subscriber. This configuration file must be called apachekafka-config.xml
and located in the X4DB/0
folder. When the X4 server is started, all information from this file is loaded.
The following XML file shows all required elements and attributes in the correct structure:
Example file apachekafka-config.xml
CODE
<?xml version='1.0' encoding='UTF-8'?>
<ApacheKafkaConfiguration>
<!-- Many clusters can be defined here -->
<!-- general approach -->
<Cluster id="ApacheKafka_broker_1" server="localhost:9092">
<!-- Now the different clients that can work with this broker are defined:
For each ClientId, we define the following things: - The ClientId name: alias
to identify the client in X4 Processes. - groupId: the id of the group where
the consumer/publisher belongs to. -->
<Client id="AK_Client_1" groupId="test"
autoCommitIntervalMS="100" />
<Client id="AK_Client_2" groupId="test"
autoCommitIntervalMS="100" />
<!-- Now, the different topics that clients can interact with for this
broker are defined -->
<Topic filter="topic1" />
<Topic filter="topic2" />
<Topic filter="topic3" />
</Cluster>
<!-- cluster with ssl / tls configuration -->
<Cluster id="ApacheKafka_broker_2" server="localhost:9093">
<!-- Node that includes the SSL/TLS needed configuration for server -->
<SslConfiguration privateKeyPassword="password">
<!-- Server keystore password and server keystore path -->
<KeyStore password="password"
path="C:\\tmp\\ssl_new\\server.keystore.jks" />
<!-- Server truststore password and server truststore path -->
<TrustStore password="password"
path="C:\\tmp\\ssl_new\\server.truststore.jks" />
</SslConfiguration>
<!-- Now the different clients that can work with this broker are defined:
For each ClientId, we define the following things: - The ClientId name: alias
to identify the client in X4 Processes. - groupId: the id of the group where
the consumer/publisher belongs to. -->
<Client groupId="test" autoCommitIntervalMS="100"
id="AK_Client_3" />
<Client groupId="test" autoCommitIntervalMS="100"
id="AK_Client_4" />
<!-- Now, the different topics that clients can interact with for this
broker are defined -->
<Topic filter="topic2" />
<Topic filter="topic5" />
<Topic filter="topic4" />
</Cluster>
<!-- cluster with optional client properties for subscriber and publisher -->
<Cluster id="ApacheKafka_broker_3" server="localhost:9093">
<Client id="TestClient" groupId="TestGroup"
autoCommitIntervalMS="100">
<!-- optional subscriber propertiers for this client -->
<subscriberProperties>
<Property key="some.subscriber.property.key" value="value" />
</subscriberProperties>
<!-- optional publisher propertiers for this client -->
<publisherProperties>
<Property key="some.publisher.property.key" value="value" />
</publisherProperties>
</Client> <!-- client now needs to be closed -->
<!-- Now, the different topics that clients can interact with for this
broker are defined -->
<Topic filter="topic2" />
<Topic filter="topic5" />
<Topic filter="topic4" />
</Cluster>
</ApacheKafkaConfiguration>
Information on subscriberProperties and publisherProperties
In the configuration file apachekafka-config.xml
, the following properties can be optionally set for each client:
The subscriberProperties
and publisherProperties
can be set for only one client and for only one operation.
Note:
The behavior of subscriberProperties
overwrites the default value, that is set in the element autoCommitIntervalMS
if this values has been set again in the subscriberProperties
.
Example for apachekafka-config.xml subscriberProperties and publisherProperties
CODE
<?xml version='1.0' encoding='UTF-8'?>
<ApacheKafkaConfiguration>
<Cluster id="ApacheKafka_broker_1" server="localhost:9092">
<Client id="AK_Client_1" groupId="TestGroup" autoCommitIntervalMS="100">
<!-- Propertiers of the Subscriber -->
<subscriberProperties>
<Property key="some.subscriber.property.key" value="value" />
</subscriberProperties>
</Client>
<Topic filter="test"/>
</Cluster>
<Cluster id="ApacheKafka_broker_2" server="localhost:9093">
<Client id="AK_Client_2" groupId="TestGroup" autoCommitIntervalMS="100">
<!-- Propertiers of the Publisher -->
<publisherProperties>
<Property key="some.publisher.property.key" value="value" />
</publisherProperties>
</Client>
<Topic filter="test"/>
</Cluster>
</ApacheKafkaConfiguration>
The most important elements and attributes are explained in more detail below:
Element / Attribute | Description |
---|
​Cluster
| Must be unique. Defines the parameters required to configure the connection to a particular Kafka cluster. The id attribute contains the information required for the adapter parameter clusterId . |
server
| URL of the cluster. Supports the TCP protocol. Possible values: hostname:port |
SslConfiguration
| Defines the SSL/TLS configuration parameters. |
privateKeyPassword
| Password of the private key in the keystore file. |
KeyStore
| Defines where the keystore to be used is located and which password is stored. |
TrustStore
| Defines where the trust store to be used is located and which password is stored. |
ClientId
| Information about the client that is used to publish messages on a specific topic. |
groupId
| Defines the group ID used to group the connections. |
autoCommitIntervalMS
| Defines the frequency of commits in milliseconds.
|
Topic
| Topic in which the messages are published. |
filter
| Name of the topic |
The values for KeyStore
, TrustStore
and privateKeyPassword
can also be set via adapter parameters. Values set via adapter parameters override the values from apachekafka-config.xml
.