Kafka Donuts - 1 - Donut Broker
This is part of a series - check out the intro and index here:
Kafka Donuts
Game Plan
The aim of this post is to get a message broker up and running on our local machine. It will serve as the backbone for all other components in this series.
We will be using Apache Kafka as our broker. Apacke Kafka uses Apache Zookeeper to maintain state of its components such as the brokers and historically the consumers.
We will spin up a Kafka cluster on Docker using Docker Compose.
The Tools
Apache Kafka
Apache Zookeeper
Docker
Docker Compose
Time To Code
We get started by making sure we have a working Docker setup. Follow the steps listed HERE to get Docker up and running.
Once Docker is up, we will use Docker Compose to spin up our cluster using a Docker Compose file.
In the project root folder create a file called dc_kafkadonuts.yml
.
This file instructs Docker what services to set up, which Docker images to use and how to connect them.
We will add pieces to it as we go along - but let's get a base version up.
For reference, the following services will be created, listed in the order they appear in this Docker file:
- ZooKeeper
- Kafka
- Confluent Schema Registry
- Kafka Connect
- Confluent Control Center
- KSQL Server
- KSQL CLI
- Confluent REST Proxy
First, let's add ZooKeeper. The ---
top of file tells us that this is a Docker Compose file. Then we list our services, starting with ZooKeeper.
We specify the hostname by which we will reference this instance of the service with - zookeeper - and keep the container name the same. We also specify that we would like to expose port 2181 to connect to Zookeeper.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
Next up, we add Kafka, the star of the show.
We expose port 9092 for our plaintext port, point it to out ZooKeeper with KAFKA_ZOOKEEPER_CONNECT
and also specify our advertised listeners to instruct where we will be listening for connections.
KAFKA_BROKER_ID
is also important and needs to be unique for every broker in a clsuter.
broker:
image: confluentinc/cp-enterprise-kafka:5.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
Now we add Schema-Registry, exposing its port 8081 and mapping that to port 30801.
Schema Registry will be used to validate and version our schemas and ensure that we are sending properly constructed data through our pipeline.
We map our internal port to 30801 because 8081 is in use quite a lot which causes port clashes. We also use depends_on
to specify that it needs to wait for Zookeeper and our Kafka broker to start before it starts.
schema-registry:
image: confluentinc/cp-schema-registry:5.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "30801:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
Kafka Connect is next - an awesome utility to get data in and out of Kafka. It has custom connectors and is easily extendable.
connect:
image: confluentinc/cp-kafka-connect:5.0.0
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- mi2:/usr/share/java/monitoring-interceptors/
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:30801'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:30801'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: /usr/share/java
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
Time for the management interface - Control Center. It comes with Confluent Enterprise only, but it is worth a showcase here.
control-center:
image: confluentinc/cp-enterprise-control-center:5.0.0
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema-registry
- connect
- ksql-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
#CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
CONTROL_CENTER_KSQL_ENDPOINT: "http://ksql-server:8088"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
Now we start up KSQL Server and KSQL CLI which is a host we will use to run KSQL CLI commands on. Effectively a little terminal that connects to the KSQL Server.
KSQL is uses for stream processing. It uses SQL syntax and is built on top of Kafka Streams.
ksql-server:
image: confluentinc/cp-ksql-server:5.0.0
hostname: ksql-server
container_name: ksql-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
KSQL_BOOTSTRAP_SERVERS: "broker:9092"
KSQL_HOST_NAME: ksql-server
KSQL_APPLICATION_ID: "cp-all-in-one"
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:30801"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
ksql-cli:
image: confluentinc/cp-ksql-cli:5.0.0
container_name: ksql-cli
depends_on:
- broker
- connect
- ksql-server
entrypoint: /bin/sh
tty: true
Sometimes we need a proxy to connect external clients or clients that do not want to connect directly and where we do not want to use Connect or something like Apache Nifi. Confluent REST Proxy plays really nicely here and is scalable.
rest-proxy:
image: confluentinc/cp-kafka-rest:latest
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:9092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:30801'
The last portion of our file simply defines a volume that we will use for storage.
volumes:
mi2: {}
To spin it all up, open a command prompt and start up the cluster with Docker Compose:
Note: This will use bandwidth as it will dowload images for Kafka and all the other services specified in the file
docker-compose -f dc_kafkadonuts.yml up -d
The output should show that all services have started up.
We will open up the Confluent Control Center to make sure all looks good:
Open Confluent Control Center
We can now go ahead and create our first topic!
Click on Topics to display the topics, then click on New Topic in the right top corner.
Then you can specify a name and click Create With Defaults.
If you head back to Topics, you should now see your new topic created.
Our broker is now ready for action!
Summary
In this post we completed the setup of our message broker, Kafka.
We spun up a couple of tools we will use in the next posts alongside it, discussing their Docker config briefly.
We ended up exploring the basics of Confluent Control Center, creating a Topic with default settings.