/ KafkaDonuts

Kafka Donuts - 1 - Donut Broker

This is part of a series - check out the intro and index here:
Kafka Donuts

Game Plan

The aim of this post is to get a message broker up and running on our local machine. It will serve as the backbone for all other components in this series.

We will be using Apache Kafka as our broker. Apacke Kafka uses Apache Zookeeper to maintain state of its components such as the brokers and historically the consumers.

We will spin up a Kafka cluster on Docker using Docker Compose.

The Tools

Apache Kafka
Apache Zookeeper
Docker
Docker Compose

Time To Code

We get started by making sure we have a working Docker setup. Follow the steps listed HERE to get Docker up and running.

Once Docker is up, we will use Docker Compose to spin up our cluster using a Docker Compose file.

In the project root folder create a file called dc_kafkadonuts.yml.

This file instructs Docker what services to set up, which Docker images to use and how to connect them.

We will add pieces to it as we go along - but let's get a base version up.

For reference, the following services will be created, listed in the order they appear in this Docker file:

  • ZooKeeper
  • Kafka
  • Confluent Schema Registry
  • Kafka Connect
  • Confluent Control Center
  • KSQL Server
  • KSQL CLI
  • Confluent REST Proxy

First, let's add ZooKeeper. The --- top of file tells us that this is a Docker Compose file. Then we list our services, starting with ZooKeeper.

We specify the hostname by which we will reference this instance of the service with - zookeeper - and keep the container name the same. We also specify that we would like to expose port 2181 to connect to Zookeeper.

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

Next up, we add Kafka, the star of the show.
We expose port 9092 for our plaintext port, point it to out ZooKeeper with KAFKA_ZOOKEEPER_CONNECT and also specify our advertised listeners to instruct where we will be listening for connections.

KAFKA_BROKER_ID is also important and needs to be unique for every broker in a clsuter.

  broker:
    image: confluentinc/cp-enterprise-kafka:5.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

Now we add Schema-Registry, exposing its port 8081 and mapping that to port 30801.

Schema Registry will be used to validate and version our schemas and ensure that we are sending properly constructed data through our pipeline.

We map our internal port to 30801 because 8081 is in use quite a lot which causes port clashes. We also use depends_on to specify that it needs to wait for Zookeeper and our Kafka broker to start before it starts.

  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "30801:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

Kafka Connect is next - an awesome utility to get data in and out of Kafka. It has custom connectors and is easily extendable.

  connect:
    image: confluentinc/cp-kafka-connect:5.0.0
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - mi2:/usr/share/java/monitoring-interceptors/
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:30801'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:30801'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: /usr/share/java
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

Time for the management interface - Control Center. It comes with Confluent Enterprise only, but it is worth a showcase here.


  control-center:
    image: confluentinc/cp-enterprise-control-center:5.0.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema-registry
      - connect
      - ksql-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9092'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      #CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
      CONTROL_CENTER_KSQL_ENDPOINT: "http://ksql-server:8088"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

Now we start up KSQL Server and KSQL CLI which is a host we will use to run KSQL CLI commands on. Effectively a little terminal that connects to the KSQL Server.

KSQL is uses for stream processing. It uses SQL syntax and is built on top of Kafka Streams.

  ksql-server:
    image: confluentinc/cp-ksql-server:5.0.0
    hostname: ksql-server
    container_name: ksql-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      KSQL_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_HOST_NAME: ksql-server
      KSQL_APPLICATION_ID: "cp-all-in-one"
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:30801"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"

  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.0.0
    container_name: ksql-cli
    depends_on:
      - broker
      - connect
      - ksql-server
    entrypoint: /bin/sh
    tty: true

Sometimes we need a proxy to connect external clients or clients that do not want to connect directly and where we do not want to use Connect or something like Apache Nifi. Confluent REST Proxy plays really nicely here and is scalable.

  rest-proxy:
    image: confluentinc/cp-kafka-rest:latest
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:9092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:30801'

The last portion of our file simply defines a volume that we will use for storage.

volumes:
    mi2: {}

To spin it all up, open a command prompt and start up the cluster with Docker Compose:
Note: This will use bandwidth as it will dowload images for Kafka and all the other services specified in the file
docker-compose -f dc_kafkadonuts.yml up -d

The output should show that all services have started up.

We will open up the Confluent Control Center to make sure all looks good:
Open Confluent Control Center

kd-1-controlcenter

We can now go ahead and create our first topic!
Click on Topics to display the topics, then click on New Topic in the right top corner.

kd-1-controlcenter-topic

Then you can specify a name and click Create With Defaults.

kd-1-controlcenter-topic-create

If you head back to Topics, you should now see your new topic created.
kd-1-controlcenter-topic-create-success

Our broker is now ready for action!

Summary

In this post we completed the setup of our message broker, Kafka.

We spun up a couple of tools we will use in the next posts alongside it, discussing their Docker config briefly.

We ended up exploring the basics of Confluent Control Center, creating a Topic with default settings.

ThatsAllFolks!

Kafka Donuts - 1 - Donut Broker
Share this