Apache Kafka
Tech-Today

Apache Kafka


1.) Overview


Apache Kafka is a distributed streaming platform. It is used for building real-time data platforms and streaming applications. In this blog, we will discuss how to install Kafka and work on some basic use cases.

This article was created using Apache Kafka version 2.12-2.1.0.

2.) Installation

Download and unpack Kafka from https://kafka.apache.org/downloads. 

2.1) Configuration

config/zookeeper.properties
  • Set the dataDir /tmp//kafka/zookeeper
config/server.properties
  • log.dirs=/tmp/kafka/logs
  • zookeeper.connect=localhost:2181
  • listeners=PLAINTEXT://localhost:9092
To test Kafka run the following commands.
>bin/zookeeper-server-start.sh config/zookeeper.properties
>bin/kafka-server-start.sh config/server.properties

The second command will start a new command prompt and you should see some logs in zookeeper.

3.) Kafka Topics

Create a topic:
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic clicks

View the topics:
>bin/kafka-topics.sh --list --zookeeper localhost:2181

Delete the topic (execute at the end):
>bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic clicks

4.) Sending and Receiving Messages

Send messages:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks
-Enter some messages here and leave the command open

Receive the messages:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic clicks --from-beginning
-You should be able to receive the messages that haven't been read yet

5.) Multi Broker

Make 2 copies of config/server.properties. Set the following properties:

config/server-1.properties
  • broker.id=1
  • listeners=PLAINTEXT://:9093
  • log.dir=/tmp/kafka-logs-1
config/server-2.properties
  • broker.id=2
  • listeners=PLAINTEXT://:9094
  • log.dir=/tmp/kafka-logs-2
Start the 2 new broker in different terminals
>bin/kafka-server-start.sh config/server.1.properties
>bin/kafka-server-start.sh config/server.2.properties

Create a new topic that will be replicated on the original node plus the two new.
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic clicks-replicated

You can run the view topics command again (above).

We can also describe the newly created topic as we specified:
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks-replicated
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks

6.) Fault Tolerance

Now, we can send some messages to our replicated topic:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks-replicated

Read the message in the replicated topic:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic clicks-replicated

Now, shut down the second node by ctrl + c in the command or close it.

Again, we can describe the replicated topic.
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks-replicated

We can the messages again from the beginning (original and 1st node, node that the second node is off).
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic clicks-replicated
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic clicks-replicated

*Close all the terminals except zookeeper and the original topic using port 9092.

7.) Import / export data from and to a file using a connector

Kafka can also read and write from and to a file. Let's try that by using the default configurations.
  • connect-standalone.properties - is basically server.properties
  • connect-file-source.properties - specify the source file to read (default: test.txt, note topic value here)
  • connect-file-sink.properties - where to write (default: test.sink.txt)
Run the connector
>bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
-Create a test.txt file where you run the connector and add some text to it. Make sure that you end with a newline. Otherwise, the last line will not be read.

Notice the log we should have something like:
[2019-01-13 16:17:09,799] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:109)
[2019-01-13 16:17:10,838] INFO Cluster ID: MYm1bMttRdCqG-njYXeO-w (org.apache.kafka.clients.Metadata:285)

There should be a newly created file with the same content named: test.sink.txt.

Note that you can still read the messages using the consumer. Topic=connect-test is from connect-file-source.properties:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

Modify the test.txt, adding "Hello World!" and your consumer should be able to pickup the message.
>{"schema":{"type":"string","optional":false},"payload":"Hello World!"}

*Terminate the consumer but leave server0 open.

8.) Streaming using WordCount app

Now let's create a new file with the following content:
>echo -e "The quick brown fox jumps over the lazy dog.\nThe quick brown fox jumps over the lazy dog." > file-input.txt

Create a new topic:
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input

Send file data to the topic, it could come from a stream.
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input < file-input.txt

Consume the input:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input --from-beginning

We can use the WordCount app package with Kafka to stream the data from the file we just created.
>bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

Consume the messages using String and Long deserializers:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

You should have an output similar to:
the 1
quick 1
brown 1
fox 1
jumps 1
over 1
the 2
lazy 1
dog. 1
the 3
quick 2
brown 2
fox 2
jumps 2
over 2
the 4
lazy 2
dog. 2




- Secure Spring Boot Rest Project With Keycloak
1. OverviewIn this blog, we will cover the basics of securing a Spring project with Keycloak using keycloak-spring-boot-starter and keycloak-spring-security-adapter.2. LimitationKeycloak is already a well-documented topic that needs no further write up....

- Setting Up Elk And Pushing Relational Data Using Logstash Jdbc Input Plugin And Secrets Keystore
IntroductionIn this tutorial, we will go over the installation of Elasticsearch. We will also show you how to configure it to gather and visualize data from a database. Logstash is an open source tool for collecting, parsing and storing logs/data for...

- How To Send And Receive Stomp Message In Jboss 7.2
The following code is an example of how we can send and receive a stomp message with JBoss 7.2. package org.meveo.util; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.Properties; import javax.naming.InitialContext;...

- Persist Jms Message In A Database In Glassfish
This tutorial will guide you on how to configure Java Messaging Service in Glassfish to store JMS message in a postgresql database. What you need (configured and running): 1.) Glassfish 3.1.2.2 2.) Postgresql 9.1 3.) JMS Broker (integrated with Glassfish)...

- Setting Up The Seam Examples In Jboss Server On A Windows Pc
I've been here before but that was a long time ago so here I am again playing with seam framework because my work requires me to. I notice there is no straightforward tutorial on how to make this so I'm making one. Download and install the following....



Tech-Today








.