/ KafkaDonuts

Kafka Donuts - on KSQLDB - #1

Confluent recently launched KSQLDB - A processing layer/tool/thingamagic that turns Kafka into a streaming database of sorts.

It's ultra cool! BUT... it's not the coolest thing on the planet per say.
What would happen though, if we combined this ultra cool new tool with THE COOLEST THING ON EARTH?

Well, that's what we are about to do...

Enter the coolest thing on earth since 1809 ....>>> DONUTS!

It just so happens that I am a big fan of the coolest thing on earth. But to make many of them, we need a shop - a donut shop to be specific.

Today we'll see how much of our donut shop we can run on Kafka, but this time using the fluffy, shiny, sparkly, brand new KSQLDB.

You can read up about this very cool new upgrade to KSQL over here: http://ksqldb.io

Enough chit-chat, let's get building... We will start with a blank slate and start building a virtual donut shop's IT backend.

Step 1: Switch on the oven

First we need to get you up and running. This post is not aimed at getting KSQLDB or Kafka running. The easiest way to get started is to use the Docker image and quickstart which can be found here: https://ksqldb.io/quickstart.html

After spinning up your Docker containers with docker-compose up, you can run this in a shell/terminal to connect to the KSQLDB CLI:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Step 2: Baking Donuts

Ok, first we need to have a registry or store for our available donuts.
In the real world, this would be captured by a frontend, some fancy AI cameras, an IOT DonutCounter 3000 (no, it probably does not exist) or some batch job from a mainframe hidden in the arctic circle. It would either be sent into a Kafka topic, or we can use the API interface for KSQLDB to insert it into the stream.

We will simultate this for our virtual donut store by inserting it into the stream via the CLI.

If you have not done so already, connect to the KSQLDB CLI:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Let's create the stream for recording donuts as they are baked. Enter the below after the KSQL> prompt:

CREATE STREAM baked_donuts(product_id DOUBLE, donut_barcode VARCHAR, product_code VARCHAR, manufactured_date VARCHAR) WITH (kafka_topic='baked_donuts', key='product_id', value_format='json', partitions=3, timestamp='manufactured_date', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSS');

WindowsTerminal_6xofqlNonS

We are creating a stream called baked_donuts which will be a recorded log of every single donut to enter the system. We have a product_id which defines the type of donut. Also note that we have included a manufactured_date field which we have set as the timestamp and also provided a format for it. The created stream will expect data to be in JSON format and we will have 3 partitions.

We can verify that the stream was created successfully by running the following statement to describe the stream:

DESCRIBE EXTENDED baked_donuts;

WindowsTerminal_AfwVVRJV8G

Now, we will run a continuous query (also known as a standing, long running or listening query). These types of queries "listen" to changes on the stream in real time and emit a results if the query is satisfied. The result itself is presented as an event.

SELECT * FROM baked_donuts EMIT CHANGES;

This tells KSQLDB to listen to changes on the baked_donut stream and emit them. As we are doing this in the CLI, the result is streamed to the CLI output - our terminal. Later on we will see how we can stream the result into a new stream.

The query is running and will output any changes. Let's give it a try. Open a new terminal and start another CLI instance:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Now we will bake three donuts. (We will fake it for now...)

INSERT INTO baked_donuts (product_id, donut_barcode, product_code, manufactured_date) VALUES (1,'00001','DONUT_CHOC_RING_SPRINKLES','2019-11-23T10:10:10.001');

INSERT INTO baked_donuts (product_id, donut_barcode, product_code, manufactured_date) VALUES (1,'00002','DONUT_CHOC_RING_SPRINKLES','2019-11-23T10:10:10.002');

INSERT INTO baked_donuts (product_id, donut_barcode, product_code, manufactured_date) VALUES (2,'00003','DONUT_CUSTARD_FILLED','2019-11-23T10:10:10.003');

Open your active terminal where your query is running, and you should see the following output:
WindowsTerminal_IItowtuavE

Our query is working and emitting results as changes happen to our stream!

If your query is still running, press ctrl+c to kill it.

Step 3: Taking Stock

Groovy, so we are able to bake donuts and record them into our system.
It would be cool to have an inventory of available stock.

We will create a table in KSQLDB to host this stock aggregation. We will increase the stock count for baked_donuts and decrease it when we sell donuts.

To make it as simple as possible, we will create a single stream to record any stock updates. Thus any baked donuts, sales or future features such as expired donuts, can simply send an event into this stream and update the stock inventory.

Let us thus create a stock_updates stream:

CREATE STREAM stock_updates (product_id DOUBLE, product_code VARCHAR, stock_value DOUBLE, stock_reason VARCHAR, transaction_code VARCHAR) WITH (kafka_topic='stock_updates', key='product_id', value_format='json', partitions=3);

WindowsTerminal_5LrvhZHvao

Now we will create the actual inventory.

Enter the below to create a table which is updated by a persisted query, aggregating the count per product id:

CREATE TABLE stock_inventory WITH (kafka_topic='stock_updates', key='product_id', value_format='json', partitions=3) AS SELECT product_id, product_code, sum(stock_value) AS available_stock FROM stock_updates GROUP BY product_id, product_code EMIT CHANGES;

WindowsTerminal_3qC4FXTf7r

Next up, we will create stock_updates events from any baked_donuts events which will force the listening query to increase the stock in the stock_inventory by 1.

INSERT INTO stock_updates SELECT product_id, product_code, 1.00 AS stock_value, 'BAKED DONUT' AS stock_reason, 'qqqqq' AS transaction_code FROM baked_donuts EMIT CHANGES;

WindowsTerminal_KDI7keRjxf

Now we can run a listening query on our table to show what happens when we insert some more donuts into our baked_donuts stream:

SELECT * FROM stock_inventory EMIT CHANGES;

In another KSQLDB CLI session, run the following commands to insert some more donuts, and keep an eye on your SELECT window, you will see the stock_inventory being updated in real time!

INSERT INTO baked_donuts (product_id, donut_barcode, product_code, manufactured_date) VALUES (1,'00004','DONUT_CHOC_RING_SPRINKLES','2019-11-23T10:10:10.004');

INSERT INTO baked_donuts (product_id, donut_barcode, product_code, manufactured_date) VALUES (1,'00005','DONUT_CHOC_RING_SPRINKLES','2019-11-23T10:10:10.005');

INSERT INTO baked_donuts (product_id, donut_barcode, product_code, manufactured_date) VALUES (2,'00006','DONUT_CUSTARD_FILLED','2019-11-23T10:10:10.006');

INSERT INTO baked_donuts (product_id, donut_barcode, product_code, manufactured_date) VALUES (2,'00007','DONUT_CUSTARD_FILLED','2019-11-23T10:10:10.007');

INSERT INTO baked_donuts (product_id, donut_barcode, product_code, manufactured_date) VALUES (2,'00008','DONUT_CUSTARD_FILLED','2019-11-23T10:10:10.008');

Taking a look at your final results, you will see that the current AVAILABLE_STOCK is exactly what it should be for each rowkey.

WindowsTerminal_1VYL9GZo92

Leave your SELECT window running, we will run the next set of commands in your other CLI window and you can monitor the results in this one.

Step 4: Selling Donuts

Selling donuts is the easy part. People automatically buy donuts, it's just a fact of life. But we still have to update the system with the sale.

We will follow the same basic process we did for our baked donuts, as it worked pretty well thus far.

Let's create the stream for recording donut sales. Enter the below after the KSQL> prompt in your CLI which is not running the SELECT:

CREATE STREAM sales(product_id DOUBLE, donut_barcode VARCHAR, product_code VARCHAR, transaction_date VARCHAR, transaction_amount DOUBLE, amount DOUBLE) WITH (kafka_topic='sales', key='product_id', value_format='json', partitions=3, timestamp='transaction_date', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSS');

WindowsTerminal_AVsPPhaxl8

Next up, we will create stock_updates events from any sale events which will force the listening query to decrease the stock in the stock_inventory by 1.

INSERT INTO stock_updates SELECT product_id, product_code, -1.00 AS stock_value, 'DONUT SALE' AS stock_reason, 'aaaa' AS transaction_code FROM sales EMIT CHANGES;

WindowsTerminal_DukP2GwmJm

Now we will record some sales into our sales stream:

INSERT INTO sales (product_id, donut_barcode, product_code, transaction_date, transaction_amount) VALUES (1,'00001','DONUT_CHOC_RING_SPRINKLES','2019-11-23T10:11:10.001', 2.50);

INSERT INTO sales (product_id, donut_barcode, product_code, transaction_date, transaction_amount) VALUES (2,'00002','DONUT_CUSTARD_FILLED','2019-11-23T10:11:10.002', 3.00);

INSERT INTO sales (product_id, donut_barcode, product_code, transaction_date, transaction_amount) VALUES (2,'00003','DONUT_CUSTARD_FILLED','2019-11-23T10:11:10.003', 3.00);

If you take a look at your SELECT window again, you will see that the inventory has decreased for the product_id's where we had sales.
WindowsTerminal_iSFSg3XbIC

Step 5: Checking Stock

Most of the things this far we could do on normal KSQL as well, but now, enter some new, shiny GROOVYNESS!

Let us query our stock levels for Custard Filled donuts.

There was a way to do this with KSQL - we could create a join on a table and send in an event with the id of the aggregation grouping we wanted to query, but this was a bit cumbersome, and not very SQL'like.

In KSQLDB we can actually query a real time table! Enter the below query after the KSQL prompt:

SELECT * FROM stock_inventory WHERE ROWKEY = '2.0|+|DONUT_CUSTARD_FILLED';

You will immediately receive the following result back, indicating the stock levels you are interested in.
WindowsTerminal_7kRbcGKcRM

This is a major advance in integrating the world of stream processing with legacy systems as it provides a known concept (pull queries) to our toolset. This proves to be very valuable when you are upgrading systems and edging them into the world of real time stream processing.

Closing Shop - Summary

We have completed the first part of our online donut shop. We have a working inventory, the basis of any good shop - and all of this based on real time event streaming using Kafka and KSQLDB!

That'sAllFolks!

Kafka Donuts - on KSQLDB - #1
Share this