Workign with Kafka and Zookeeper within Docker or from source

If you are working with Kafka and Zookeeper to create some test cases or for the work in bigger projects, you can install them directly from the respective project website or make an installation with a Docker container. The following hints can help you with your first steps to make progress when working with the tools.

Kafka and Zookeeper within Docker

The examples assumes that the client can access the instance within the Docker at port 33222.

Output the logs

docker logs -f my-project-kafka-c
docker logs -f my-project-zk4kafka-c

Output all messages for a specific topic

docker exec -it my-report-kafka-c /usr/bin/kafka-console-consumer --bootstrap-server localhost:33222 --topic TOPICNAME --from-beginning

Kafka and Zookeeper as direct installation

Download Apache Kafka from the respective Apache project website. Extract the package and you can execute the following tasks.

The following examples expect that you are within the respective folder of Apache Kafka. The version of Apache Kafka is kafka_2.12-1.0.0.

Start the Apache Zookeeper and Apache Kafka locally

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Create a topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TOPICNAME

Sent a message to the topic

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TOPICNAME

Show all received messages for a specific topic

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPICNAME --from-beginning

Show the existing topics

bin/kafka-topics.sh --list --zookeeper localhost:2181

Use local Apache Kafka instance to access data within Apache Kafka instance of Docker

The examples assumes that the client can access the Apache Kafka instance within the Docker at port 33222 and the Apache Zookeeper instance within Docker at port 33272.

Consume messages for a specific topic

bin/kafka-console-consumer.sh --bootstrap-server localhost:33222 --topic TOPICNAME --from-beginning

Produce messages to the instance within Docker

bin/kafka-console-producer.sh --broker-list localhost:33222 --topic TOPICNAME

List the topics from the Apache Zookeeper instance within Docker

bin/kafka-topics.sh --list --zookeeper localhost:33272

Additional tools

Look for processes that occur on the port for the local Apache Kafka within Docker

The example assumes that the client can access the Apache Kafka instance within the Docker at port 33222.

netstat -tulpe | grep 33222

Remove everything that was created within docker

docker-compose down -v

Replace commit messages within rebase process to squash entries by using vim

If you have a GIT project and want to make a rebase and squash from your feature branch against the master branch, you can simplify the adaptation of the pick entries to s (squash) by the following commands from within vim.

# We are on the "feature" branch and the "master" branch was `fetched` and `merged` before
> git rebase -i master

# We want to change the "pick" entries to "s" within "vim"
# c .. means "confirm"
:s/^pick/s/c

# We press "n" for the first entry of the commit messages list
# Then we press "a" to confirm that all other entries should be changed

Kafka network client created connection error because of missing Broker

The following error occured within a Docker environment that contains Apache Kafka and Zookeper and is managed by Confluentinc.

o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-2] Connection to node -1 could not be established. Broker may not be available.

It was a mess to find the solution. First, we checked the source code and the functionalities of the existing Akka actors. It became clear that this would not lead to the solution.

Another idea was to understand what the Confluentinc Docker image really was doing. After researching the default setting that are used to startup the Apache Kafka and the Zookeeper instances, it was clear that here would be the crux.

We modified the docker-compose.yml by adding one additional parameter to the Apache Kafka configuration. The final docker-compose.yml file looks like follows:

version: '3'
services:

  my-service-postgres-test-s:
    image: postgres:9.6.6-alpine
    container_name: my-service-postgres-test-c
    env_file:
      - ./db/env_files/env_test.list
    ports:
      - "36822:5432"
    volumes:
      - my-service-postgres-test-data-v:/var/lib/postgresql/data
    networks:
      - "my-service-n"

  my-service-s:
    image: URI_TO_THE_IMAGE
    container_name: my-service-c
    ports:
      - "22822:22822"
      - "22825:22825"
    depends_on:
      - my-service-postgres-test-s
    volumes:
      - ./config:/opt/docker/conf
    networks:
      - "my-service-n"

  my-service-zk4kafka-s:
    image: confluentinc/cp-zookeeper:4.0.0
    container_name: my-service-zk4kafka-c
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 33272
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 10
      ZOOKEEPER_SYNC_LIMIT: 5
      ZOOKEEPER_AUTOPURGE_SNAPRETAINCOUNT: 5
      ZOOKEEPER_AUTOPURGE_PURGEINTERVAL: 2
      ZOOKEEPER_SERVERS: 127.0.0.1:33277:33278
      ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: INFO
      KAFKA_JMX_HOSTNAME: 127.0.0.1
      KAFKA_JMX_PORT: 33279
    network_mode: host
#    ports:
#      - "33272:33272"
#      - "33277:33277"
#      - "33278:33278"
#      - "33279:33279"
    volumes:
      - my-service-zk4kafka-data-v:/var/lib/zookeeper/data
      - my-service-zk4kafka-log-v:/var/lib/zookeeper/log

  my-service-kafka-s:
    image: confluentinc/cp-kafka:4.0.0
    container_name: my-service-kafka-c
    depends_on:
      - my-service-zk4kafka-s
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 127.0.0.1:33272
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:33222
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_COMPRESSION_TYPE: 'lz4'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 1
      KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: 'false'
      KAFKA_LOG_CLEANER_ENABLE: 'false'
      KAFKA_LOG_RETENTION_MS: 315360000000
      KAFKA_OFFSETS_RETENTION_MINUTES: 6912000
      KAFKA_MAX_CONNECTIONS_PER_IP: 100
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_LOG4J_ROOT_LOGLEVEL: DEBUG
      KAFKA_LOG4J_LOGGERS: "kafka.controller=DEBUG,state.change.logger=DEBUG"
      KAFKA_TOOLS_LOG4J_LOGLEVEL: DEBUG
      KAFKA_JMX_HOSTNAME: 127.0.0.1
      KAFKA_JMX_PORT: 33229
    network_mode: host
#    ports:
#      - "33222:33222"
#      - "33229:33229"
    volumes:
      - my-service-kafka-data-v:/var/lib/kafka/data

volumes:
  my-service-postgres-test-data-v:
  my-service-zk4kafka-data-v:
  my-service-zk4kafka-log-v:
  my-service-kafka-data-v:

networks:
  my-service-n:

The important addition was the following line:

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

This overwrites the standard default value set by the managed environment and makes it synchron to the number of partitions.

Important: You have to delete the formerly created Topics, because these have been created with the default value for offset replication that was determined by Confluentinc.