/ KafkaDonuts

Kafka Donuts - 2 - Donut Baker

This is part of a series - check out the intro and index here:
Kafka Donuts

Game Plan

In this post we will be creating donuts.

We will create a simulator in .Net Core, C#, that will create donuts and send them to Kafka. We will publish directly into Kafka from our app using the Confluent C# Client.

We will create a Docker image for our .Net Core app and spin it up in our local Docker environment.

The Tools

.Net Core
C#
Confluent Kafka Client
Docker

Time To Code

DotNet Core

First of all, we need to make sure .Net Core is installed on our machine.

Open up a command prompt and run dotnet --info.
If you see dotnet core version info, you can move on to the next part as dotnet core is ready.

If you do not see version information, Download the .Net Core SDK

After downloading you can install it and you should be good to go!

Create a .Net Core App

Make sure that you are in your project root folder. Then create a subfolder called kd1_DonutBaker.
mkdir kd1_DonutBaker.

In this directory, type the following to create a dotnet core console application:
dotnet new console

Now we need to add the Confluent Kafka packages to be able to use the Producer client. In your project folder, run:
dotnet add package Confluent.Kafka

Also in your main project directory, create file called DonutBaker.cs.

Let's add some code, shall we?
We start by bringing in all of our dependencies:

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;

Now we define our class, DonutBaker and set some string values that we will use to connect. brokerList points to our Kafka cluster defined in Kafka Donuts - 1, and topicName refers to the topic we will publish in to.

public class DonutBaker
{
    string brokerList = "broker:9092";
    string topicName = "Donuts_Baked";
    int testCount = 100000;
    Producer<string, string> producer = null;

    Dictionary<string, object> config = null;

Our Start function simply manages our program flow.

    public void Start()
    {
        ConfigureProducer();
        StartProducer();
        GenerateMessages();
        StopProducer();
    }

Then we add our configuration for connecting to our Kafka cluster. We add in the broker list as bootstrap.servers and also specify that we would like to gzip our messages in transit, which means we will save on networking bandwidth.

    public void ConfigureProducer()
    {
        config = new Dictionary<string, object> {
        { "bootstrap.servers", brokerList },
        {"socket.blocking.max.ms", "1"}, //min=1
        {"queue.buffering.max.ms", "1"},
        {"compression.type","gzip"},
        {"queue.buffering.max.messages","20000"}
     };
    }

Start and Stop functions are pretty straight forward. Note that we are flushing our producer and giving some time for it to finish off processing any messages in the pipe before disposing it.

    public void StartProducer()
    {
        producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));

        producer.OnError += (_, err) =>
        {
            Console.WriteLine("Producer.OnError: " + err.Reason);
        };

        producer.OnLog += (_, lg) =>
        {
            Console.WriteLine("Producer.OnLog: " + lg.Message);
        };
    }

    public void StopProducer()
    {
        producer.Flush(10000);
        producer.Dispose();
    }

Probably the most critical part - we send a message! We can send async or sync.
We get back a delivery report which contains rich informative messages about the state of our request. This is the part where you will handle what happens with messages if transmission fails. It is also the part where you can harvest the offset for your published message and save that back to link it up later for consolidation, if you are into that kinda thing...

    public void SendMessage(string topicName, string key, string val)
    {
        var deliveryReport = producer.ProduceAsync(topicName, key, val);
        if (deliveryReport.IsFaulted)
        {
            Console.WriteLine("ASYNC SEND ERROR: " + deliveryReport.Result.Error.Reason);
        }
        else
        {
            if (deliveryReport.Result.Offset % 1000 == 0)
            {
                Console.WriteLine($"Partition: {deliveryReport.Result.Partition}, Offset: {deliveryReport.Result.Offset}");
            }
        }
    }

Our GenerateMessages function simply loops and generates donuts, calling send to ship them off to Kafka.

    public void GenerateMessages()
    {
        try
        {
            var cancelled = false;
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true;
                cancelled = true;
            };

            int counter = 0;

            while (!cancelled)
            {
                counter++;
                if (counter >= testCount)
                {
                    cancelled = true;
                }
                string text;
                try
                {
                    text = $"DID:{counter}";
                    if (counter % 1000 == 0)
                    {
                        System.Threading.Thread.Sleep(new TimeSpan(0, 0, 0, 10));
                    }
                }
                catch (IOException ioex)
                {
                    Console.WriteLine("IO Error:" + ioex.Message);
                    break;
                }
                if (text == null)
                {
                    break;
                }
                string key = null;
                string val = "{" + $"DID:{counter}, EventCreatedDate:{DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ssfffzzz")}" + ", DonutType:Chocolate Ring}";

                try
                {
                    SendMessage(topicName, key, val);
                }
                catch (Exception exSendMessage)
                {
                    Console.WriteLine("Error:" + exSendMessage.Message);
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

That completes our DonutBaker class. Now, change your Program.cs file so that the main method looks as follows - using what we have just created:

static void Main(string[] args)
        {
            DonutBaker donutBaker = new DonutBaker();
            donutBaker.Start();
        }

Build, Package and Run

We now have an application that will - in theory - create donuts and send them to Kafka.

Next up, we build our app. Run this in the command prompt:

dotnet build -c Release

Then we add a Dockerfile to wrap it in a Docker image. Create a file called Dockerfile with the following content:

FROM microsoft/dotnet:sdk AS build-env
WORKDIR /app

# Copy csproj and restore as distinct layers
COPY *.csproj ./
RUN dotnet restore

# Copy everything else and build
COPY . ./
RUN dotnet publish -c Release -o out

# Build runtime image
FROM microsoft/dotnet:aspnetcore-runtime
WORKDIR /app
COPY --from=build-env /app/out .
ENTRYPOINT ["dotnet", "kd1_DonutBaker.dll"]

With that ready, we run our Docker build to create a Docker image, using . to point to the current directory, with our application running on a dotnet aspnetcore runtime:

docker build -t kd1_donut_baker .

Our image is now ready to use locally, or to add to any local docker compose file:

 kd1_donut_baker:
    image: "kd1_donut_baker:latest"
    hostname: kd1_donut_baker
    container_name: kd1_donut_baker

After adding the text above to your dc_kafkadonuts.yml file created in post 1, add the new component by running the up command again as follows:
docker-compose -f dc_kafkadonuts.yml up -d

If we open up our Control Center, we can quickly see our new Topic automatically created. Thanks to the cool people over at Confluent, we can actually inspect our data in real time! Donuts are being baked!!!
Open Confluent Control Center

kd-2-controlcenter-newtopic

And then, let's see how it looks!...
kd-2-controlcenter-newtopic-inspect

Summary

In this post we created a .Net Core C# application that publishes data into our Kafka cluster using the Confluent Publisher Client.

We showed how to build a Docker image from our project and host our .Net Core application in Docker.

We then touched on inspecting our data flowing into Kafka in Control Center and even saw the raw data as it was passing through.

Kafka Donuts - 2 - Donut Baker
Share this