Workign with Kafka and Zookeeper within Docker or from source

If you are working with Kafka and Zookeeper to create some test cases or for the work in bigger projects, you can install them directly from the respective project website or make an installation with a Docker container. The following hints can help you with your first steps to make progress when working with the tools.

Kafka and Zookeeper within Docker

The examples assumes that the client can access the instance within the Docker at port 33222.

Output the logs

docker logs -f my-project-kafka-c
docker logs -f my-project-zk4kafka-c

Output all messages for a specific topic

docker exec -it my-report-kafka-c /usr/bin/kafka-console-consumer --bootstrap-server localhost:33222 --topic TOPICNAME --from-beginning

Kafka and Zookeeper as direct installation

Download Apache Kafka from the respective Apache project website. Extract the package and you can execute the following tasks.

The following examples expect that you are within the respective folder of Apache Kafka. The version of Apache Kafka is kafka_2.12-1.0.0.

Start the Apache Zookeeper and Apache Kafka locally

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Create a topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TOPICNAME

Sent a message to the topic

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TOPICNAME

Show all received messages for a specific topic

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPICNAME --from-beginning

Show the existing topics

bin/kafka-topics.sh --list --zookeeper localhost:2181

Use local Apache Kafka instance to access data within Apache Kafka instance of Docker

The examples assumes that the client can access the Apache Kafka instance within the Docker at port 33222 and the Apache Zookeeper instance within Docker at port 33272.

Consume messages for a specific topic

bin/kafka-console-consumer.sh --bootstrap-server localhost:33222 --topic TOPICNAME --from-beginning

Produce messages to the instance within Docker

bin/kafka-console-producer.sh --broker-list localhost:33222 --topic TOPICNAME

List the topics from the Apache Zookeeper instance within Docker

bin/kafka-topics.sh --list --zookeeper localhost:33272

Additional tools

Look for processes that occur on the port for the local Apache Kafka within Docker

The example assumes that the client can access the Apache Kafka instance within the Docker at port 33222.

netstat -tulpe | grep 33222

Remove everything that was created within docker

docker-compose down -v

Replace commit messages within rebase process to squash entries by using vim

If you have a GIT project and want to make a rebase and squash from your feature branch against the master branch, you can simplify the adaptation of the pick entries to s (squash) by the following commands from within vim.

# We are on the "feature" branch and the "master" branch was `fetched` and `merged` before
> git rebase -i master

# We want to change the "pick" entries to "s" within "vim"
# c .. means "confirm"
:s/^pick/s/c

# We press "n" for the first entry of the commit messages list
# Then we press "a" to confirm that all other entries should be changed

Kafka network client created connection error because of missing Broker

The following error occured within a Docker environment that contains Apache Kafka and Zookeper and is managed by Confluentinc.

o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-2] Connection to node -1 could not be established. Broker may not be available.

It was a mess to find the solution. First, we checked the source code and the functionalities of the existing Akka actors. It became clear that this would not lead to the solution.

Another idea was to understand what the Confluentinc Docker image really was doing. After researching the default setting that are used to startup the Apache Kafka and the Zookeeper instances, it was clear that here would be the crux.

We modified the docker-compose.yml by adding one additional parameter to the Apache Kafka configuration. The final docker-compose.yml file looks like follows:

version: '3'
services:

  my-service-postgres-test-s:
    image: postgres:9.6.6-alpine
    container_name: my-service-postgres-test-c
    env_file:
      - ./db/env_files/env_test.list
    ports:
      - "36822:5432"
    volumes:
      - my-service-postgres-test-data-v:/var/lib/postgresql/data
    networks:
      - "my-service-n"

  my-service-s:
    image: URI_TO_THE_IMAGE
    container_name: my-service-c
    ports:
      - "22822:22822"
      - "22825:22825"
    depends_on:
      - my-service-postgres-test-s
    volumes:
      - ./config:/opt/docker/conf
    networks:
      - "my-service-n"

  my-service-zk4kafka-s:
    image: confluentinc/cp-zookeeper:4.0.0
    container_name: my-service-zk4kafka-c
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 33272
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 10
      ZOOKEEPER_SYNC_LIMIT: 5
      ZOOKEEPER_AUTOPURGE_SNAPRETAINCOUNT: 5
      ZOOKEEPER_AUTOPURGE_PURGEINTERVAL: 2
      ZOOKEEPER_SERVERS: 127.0.0.1:33277:33278
      ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: INFO
      KAFKA_JMX_HOSTNAME: 127.0.0.1
      KAFKA_JMX_PORT: 33279
    network_mode: host
#    ports:
#      - "33272:33272"
#      - "33277:33277"
#      - "33278:33278"
#      - "33279:33279"
    volumes:
      - my-service-zk4kafka-data-v:/var/lib/zookeeper/data
      - my-service-zk4kafka-log-v:/var/lib/zookeeper/log

  my-service-kafka-s:
    image: confluentinc/cp-kafka:4.0.0
    container_name: my-service-kafka-c
    depends_on:
      - my-service-zk4kafka-s
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 127.0.0.1:33272
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:33222
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_COMPRESSION_TYPE: 'lz4'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 1
      KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: 'false'
      KAFKA_LOG_CLEANER_ENABLE: 'false'
      KAFKA_LOG_RETENTION_MS: 315360000000
      KAFKA_OFFSETS_RETENTION_MINUTES: 6912000
      KAFKA_MAX_CONNECTIONS_PER_IP: 100
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_LOG4J_ROOT_LOGLEVEL: DEBUG
      KAFKA_LOG4J_LOGGERS: "kafka.controller=DEBUG,state.change.logger=DEBUG"
      KAFKA_TOOLS_LOG4J_LOGLEVEL: DEBUG
      KAFKA_JMX_HOSTNAME: 127.0.0.1
      KAFKA_JMX_PORT: 33229
    network_mode: host
#    ports:
#      - "33222:33222"
#      - "33229:33229"
    volumes:
      - my-service-kafka-data-v:/var/lib/kafka/data

volumes:
  my-service-postgres-test-data-v:
  my-service-zk4kafka-data-v:
  my-service-zk4kafka-log-v:
  my-service-kafka-data-v:

networks:
  my-service-n:

The important addition was the following line:

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

This overwrites the standard default value set by the managed environment and makes it synchron to the number of partitions.

Important: You have to delete the formerly created Topics, because these have been created with the default value for offset replication that was determined by Confluentinc.

Mathematical basics – Set theory

Set

A Set is a collection of objects that are distinct. Assume the following Set A has the elements a, b, c and d, so we can say A = {a,b,c,d}.

In the example above, the element a is within Set A. You can write aA. Another element, let’s say m is not within A, therefore we can write mA.

Two sets have the same mightiness, if they can be imaged bijectively on each other, i.e. there is a one-to-one relationship between their elements.

Bijection:

  • Complete pair formation between the elements of the definition set and target set
  • Bijections thus treat their definition range and their value range symmetrically
  • definition set and target set have the same mightiness

Subsets

If all elements of a Set S are also within another Set B, we call S a subset of B, or SB.

  • all elements of A are also elements of B
  • expressed as SB
  • also S is contained in B
  • same BS, means B is a superset of S
  • all sets are subsets of themselves: SS and BB

The universal Set U includes all objects and additionally itself. So all sets are also subsets of the universal set BU and SU.

The empty set, expressed as ∅, is a subset of all other sets, like as example B and B

Power sets

The Power Set of a Set contains all subsets of the set and additionally also the empty set. The following example defines a set S with elements {1,2}. The Power Set P(S) contains als subsets and the empty set.

  • Power set of finite set contains 2^n elements

Isomorphism

  • bijective
  • also Homeomorphism

Homeomorphism

  • bijective
  • structure-preserving
  • weaker than Isomorphism

Diverse

Code generation with Slick code generator of PostgreSQL database in Scala project

If you automatically want to generate the Tables mapping of your database tables for the use within the Slick environment, you can use the Slick code generator to achieve this task.

The groundwork contains:

  • Database tables definitions within the database to generate the mappings
  • Slick code generator integration to your project
  • Execute the code generation from within the sbt console (or intgrate to your project)

Database tables

You can create your database evolutions by using e.g. PgAdmin or any other tool.

Integrate the Slick code generator to your project

You can integrate the generator within your build.sbt or the Build.scala. The following configuration could be an example project configured within the build.sbt.

libraryDependencies += "com.typesafe.slick" %% "slick-codegen" % "3.2.0"

Execute the generation of the code

Open your sbt.

> sbt

Open the console.

scala> console

Execute the generation of the code.

slick.codegen.SourceCodeGenerator.main(
  Array(
    "slick.jdbc.PostgresProfile",
    "org.postgresql.Driver",
    "jdbc:postgresql://localhost:5432/db",
    "./target/generatedSources",
    "com.my.db",
    "USERNAME",
    "PASSWORD"
  )
)

The SourceCodeGenerator can be configured depending on your needs. Additinoal information can be found at http://slick.lightbend.com/doc/3.2.0/code-generation.html.

Make a SSH key available that is not the default one and combine with GIT SSH command

If you want to add another SSH key to the current shell session that is not the default one, you can do the following.

Export the additional key to the GIT SSH command to have it in scope for the current session.

export GIT_SSH_COMMAND="ssh -i ~/.ssh/filename"

Add the SSH key to the current shell session.

ssh-add ~/.ssh/filename

Besides the former command, you can still add the default ssh key with.

ssh-add

Semi-automatic Circe decoder for JsonString of slick-pg

If you need a decoder for storing JSON to a database or implicit conversion of a com.github.tminglei.slickpg.JsonString, you can us the semi-automatic derivation of Circe.

The following imports are neccessary.

import com.github.tminglei.slickpg.JsonString
import io.circe.Decoder
import io.circe.generic.semiauto._

Let`s assume, we have the following case class:

final case class MyClass(
    myId: Long,
    desc: String,
    structure: JsonString
)

The following implicit decoder is semi-automatically derived by Circe.

/**
 * Implicit Decoder for decoding the JsonString value
 */
implicit val decodeMyClassStructureJsonString: Decoder[JsonString] = deriveDecoder[JsonString]

Decoder and Encoder

An implicit decoder and encoder for the case class could be as follows.

// Circe codec for decoding a MyClass from JSON.
  implicit val decodeMyClass: Decoder[MyClass] = (c: HCursor) =>
    for {
      myId      <- c.downField("myId").as[Long]
      desc      <- c.downField("desc").as[String]
      structure <- c.downField("structure").as[JsonString]
    } yield
      MyClass(
        myId,
        orgadescisationUuid,
        structure
    )

  // Circe codec for encoding a MyClass to JSON.
  implicit val encodeMyClass: Encoder[MyClass] = (a: MyClass) =>
    Json.obj(
      ("myId", a.myId.asJson),
      ("desc", a.desc.asJson),
      ("structure", a.structure.value.asJson)
  )

Current versions:

  • slick-pg 0.15.2
  • circe 0.8.0

Akka stream with Alpakka and FTP connector – close the stream

If you use an Alpakka FTP connector, you have to close the stream at the end to finish the connection correctly. The default behavior is to sent a `akka.actor.Status.Success` message to the ActorRef.

actor ! akka.actor.Status.Success

I got the following error message when sending the message like above.

java.lang.ClassCastException: akka.actor.Status$Success$ cannot be cast to akka.util.ByteString

The FTP connector requires a ByteString when writing content to the target of the stream.

The following code demonstrates a simple implementation of a FTP connector:

val uri: java.net.URI: = ...
val username = USERNAME
val password = PASSWORD
val DEFAULT_CHARSET = "UTF-8"

val byteSource: Source[ByteString, ActorRef] =
  Source.actorRef[ByteString](Int.MaxValue, OverflowStrategy.fail)

val host = InetAddress.getByName(uri.getHost)
val port = 21
val credentials = NonAnonFtpCredentials(username, password)

val settings =
  FtpSettings(
    host, port, credentials, binary = true, passiveMode = false
  )
val ftpConnection: Sink[ByteString, Future[IOResult]] = 
  Ftp.toPath(path, settings, append = true)

val writer = Flow[ByteString].to(ftpConnection).runWith(byteSource)

You can send a message to the stream in the following manner:

writer ! ByteString("MESSAGE".getBytes(DEFAULT_CHARSET))

Back to the `ClassCastException`. If you want to close the stream correctly, you have to send a close message with the correct data type that is expected by the stream. Example:

actor ! akka.actor.Status.Success("Success".getBytes(DEFAULT_CHARSET))