Kafka Donuts - 2 - Donut Baker
This is part of a series - check out the intro and index here:
Kafka Donuts
Game Plan
In this post we will be creating donuts.
We will create a simulator in .Net Core, C#, that will create donuts and send them to Kafka. We will publish directly into Kafka from our app using the Confluent C# Client.
We will create a Docker image for our .Net Core app and spin it up in our local Docker environment.
The Tools
.Net Core
C#
Confluent Kafka Client
Docker
Time To Code
DotNet Core
First of all, we need to make sure .Net Core is installed on our machine.
Open up a command prompt and run dotnet --info
.
If you see dotnet core version info, you can move on to the next part as dotnet core is ready.
If you do not see version information, Download the .Net Core SDK
After downloading you can install it and you should be good to go!
Create a .Net Core App
Make sure that you are in your project root folder. Then create a subfolder called kd1_DonutBaker.
mkdir kd1_DonutBaker
.
In this directory, type the following to create a dotnet core console application:
dotnet new console
Now we need to add the Confluent Kafka packages to be able to use the Producer client. In your project folder, run:
dotnet add package Confluent.Kafka
Also in your main project directory, create file called DonutBaker.cs.
Let's add some code, shall we?
We start by bringing in all of our dependencies:
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
Now we define our class, DonutBaker
and set some string values that we will use to connect. brokerList
points to our Kafka cluster defined in Kafka Donuts - 1, and topicName
refers to the topic we will publish in to.
public class DonutBaker
{
string brokerList = "broker:9092";
string topicName = "Donuts_Baked";
int testCount = 100000;
Producer<string, string> producer = null;
Dictionary<string, object> config = null;
Our Start
function simply manages our program flow.
public void Start()
{
ConfigureProducer();
StartProducer();
GenerateMessages();
StopProducer();
}
Then we add our configuration for connecting to our Kafka cluster. We add in the broker list as bootstrap.servers
and also specify that we would like to gzip our messages in transit, which means we will save on networking bandwidth.
public void ConfigureProducer()
{
config = new Dictionary<string, object> {
{ "bootstrap.servers", brokerList },
{"socket.blocking.max.ms", "1"}, //min=1
{"queue.buffering.max.ms", "1"},
{"compression.type","gzip"},
{"queue.buffering.max.messages","20000"}
};
}
Start and Stop functions are pretty straight forward. Note that we are flushing our producer and giving some time for it to finish off processing any messages in the pipe before disposing it.
public void StartProducer()
{
producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));
producer.OnError += (_, err) =>
{
Console.WriteLine("Producer.OnError: " + err.Reason);
};
producer.OnLog += (_, lg) =>
{
Console.WriteLine("Producer.OnLog: " + lg.Message);
};
}
public void StopProducer()
{
producer.Flush(10000);
producer.Dispose();
}
Probably the most critical part - we send a message! We can send async or sync.
We get back a delivery report which contains rich informative messages about the state of our request. This is the part where you will handle what happens with messages if transmission fails. It is also the part where you can harvest the offset for your published message and save that back to link it up later for consolidation, if you are into that kinda thing...
public void SendMessage(string topicName, string key, string val)
{
var deliveryReport = producer.ProduceAsync(topicName, key, val);
if (deliveryReport.IsFaulted)
{
Console.WriteLine("ASYNC SEND ERROR: " + deliveryReport.Result.Error.Reason);
}
else
{
if (deliveryReport.Result.Offset % 1000 == 0)
{
Console.WriteLine($"Partition: {deliveryReport.Result.Partition}, Offset: {deliveryReport.Result.Offset}");
}
}
}
Our GenerateMessages
function simply loops and generates donuts, calling send to ship them off to Kafka.
public void GenerateMessages()
{
try
{
var cancelled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancelled = true;
};
int counter = 0;
while (!cancelled)
{
counter++;
if (counter >= testCount)
{
cancelled = true;
}
string text;
try
{
text = $"DID:{counter}";
if (counter % 1000 == 0)
{
System.Threading.Thread.Sleep(new TimeSpan(0, 0, 0, 10));
}
}
catch (IOException ioex)
{
Console.WriteLine("IO Error:" + ioex.Message);
break;
}
if (text == null)
{
break;
}
string key = null;
string val = "{" + $"DID:{counter}, EventCreatedDate:{DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ssfffzzz")}" + ", DonutType:Chocolate Ring}";
try
{
SendMessage(topicName, key, val);
}
catch (Exception exSendMessage)
{
Console.WriteLine("Error:" + exSendMessage.Message);
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
That completes our DonutBaker class. Now, change your Program.cs file so that the main method looks as follows - using what we have just created:
static void Main(string[] args)
{
DonutBaker donutBaker = new DonutBaker();
donutBaker.Start();
}
Build, Package and Run
We now have an application that will - in theory - create donuts and send them to Kafka.
Next up, we build our app. Run this in the command prompt:
dotnet build -c Release
Then we add a Dockerfile to wrap it in a Docker image. Create a file called Dockerfile with the following content:
FROM microsoft/dotnet:sdk AS build-env
WORKDIR /app
# Copy csproj and restore as distinct layers
COPY *.csproj ./
RUN dotnet restore
# Copy everything else and build
COPY . ./
RUN dotnet publish -c Release -o out
# Build runtime image
FROM microsoft/dotnet:aspnetcore-runtime
WORKDIR /app
COPY --from=build-env /app/out .
ENTRYPOINT ["dotnet", "kd1_DonutBaker.dll"]
With that ready, we run our Docker build to create a Docker image, using . to point to the current directory, with our application running on a dotnet aspnetcore runtime:
docker build -t kd1_donut_baker .
Our image is now ready to use locally, or to add to any local docker compose file:
kd1_donut_baker:
image: "kd1_donut_baker:latest"
hostname: kd1_donut_baker
container_name: kd1_donut_baker
After adding the text above to your dc_kafkadonuts.yml file created in post 1, add the new component by running the up command again as follows:
docker-compose -f dc_kafkadonuts.yml up -d
If we open up our Control Center, we can quickly see our new Topic automatically created. Thanks to the cool people over at Confluent, we can actually inspect our data in real time! Donuts are being baked!!!
Open Confluent Control Center
And then, let's see how it looks!...
Summary
In this post we created a .Net Core C# application that publishes data into our Kafka cluster using the Confluent Publisher Client.
We showed how to build a Docker image from our project and host our .Net Core application in Docker.
We then touched on inspecting our data flowing into Kafka in Control Center and even saw the raw data as it was passing through.