/ KafkaDonuts

Kafka Donuts - on KSQLDB - #2

In the last post we played around with KSQLDB and started to build a virtual donut shop. You can view that post here: Kafka Donuts - on KSQLDB - #1

You can read up about KSQLDB, the very cool new upgrade to KSQL over here: http://ksqldb.io

In this post we continue diving into KSQLDB a bit deeper to uncover some of its coolness and see how it can aid us in building the best virtual donut shop in town!

We will specifically look at using the CASE statement with some JOINS to manage our inventory better with both newly baked donuts as well as validated sales.

But first... a poem about the coolest thing on earth:

O donut so round
You taste nothing like ground
Your chocolate dripping so wildly
nothing about you is mildly
In oil you were casted
but not long you lasted
For you were so delicious
even if not...
completely nutritious.

The End.

OK, enough weirdness - let's focus.


You can follow the link above to read through the first post if you have not done so, but to recap, this is where we left off:

  • We created a stream baked_donuts where we stream any baked donuts to
  • We created a stream stock_updates to stream both increases and decreases in out stock
  • We created a stock_inventory table to keep state of the actual stock levels
  • We created a sales stream to record sales
  • We performed a pull query from stock_inventory which showed us in real-time how much stock we have

Our first Donut Shop bug!

All of this was working pretty well... but we have been receiving some complaints from clients saying they had ordered donuts successfully, only for the shop to say there is no stock available afterwards!

After some research, we have located the issue. We are allowing donut sales without checking the stock first.

Luckily for us, this is not too difficult to fix.


Let us talk through the requirement:
When a customer orders donuts, we should first check if there is stock available for that type of donut. If the required amount of stock is available, we should subtract the stock and mark the sale as successful.

If there is not enough stock to satisfy the request, we should not alter the stock and return a message stating that the sale failed because of stock not being available.

Transactions and State

We are tiptoeing around a very common problem here, related to transactions, state and ACID properties. It's fun to think about these age old cornerstone facts of data processing with some new glasses on every now and then, so let us explore a bit how we can handle this in an event streaming world.

I am not about to dive into a full blown write-up about transactions in event streaming.... yet. There's some very cool posts about that already. We will rather explore how this will look like in a basic inventory scenario in KSQLDB.

Mix the icing

We will create all of the topics, streams, tables and queries required with different names as to not clash with the first post. You will see some minor differences in the fields. We will prefix all topics, streams and tables with kd2_.

First let us create our baked donuts topic for sending new baked donuts into:

CREATE STREAM kd2_baked_donuts(product_id varchar, manufactured_date VARCHAR) WITH (kafka_topic='kd2_baked_donuts', key='product_id', value_format='json', partitions=3, timestamp='manufactured_date', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSS');


Now we will create a stream to transfer and host any stock updates, negative or positive:

CREATE STREAM kd2_stock_updates (product_id varchar, stock_value DOUBLE, stock_reason VARCHAR) WITH (kafka_topic='kd2_stock_updates', key='product_id', value_format='json', partitions=3);


Next we instantiate a persisted query to insert any baked donuts received into our stock update stream with a positive increment of 1. Note that this can be an amount field if your source stream has an amount rather than an event per update:

INSERT INTO kd2_stock_updates SELECT product_id, 1.00 AS stock_value, 'BAKED DONUT' AS stock_reason FROM kd2_baked_donuts EMIT CHANGES;


We can create our kd2_sales stream which we can use to send sales events into:

CREATE STREAM kd2_sales(product_id varchar, transaction_date VARCHAR, transaction_amount DOUBLE) WITH (kafka_topic='kd2_sales', key='product_id', value_format='json', partitions=3, timestamp='transaction_date', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSS');


Now we can create an inventory table that will be updated by any event that comes into the kd2_stock_updates stream:

CREATE TABLE kd2_stock_inventory AS SELECT product_id, sum(stock_value) AS available_stock FROM kd2_stock_updates GROUP BY product_id EMIT CHANGES;


Checking stock - Donut_Counter_3000

With all the prep work done and base streams flowing, we can now write a listening query on the sales stream to verify that there is enough stock, before allowing the sale.

INSERT INTO kd2_stock_updates SELECT SA.product_id as product_id, CASE WHEN available_stock >= transaction_amount THEN -1.00 * transaction_amount WHEN available_stock < transaction_amount then 0.00 END AS stock_value, CASE WHEN available_stock >= transaction_amount THEN 'SALE COMPLETED' WHEN available_stock < transaction_amount then 'FAILED - NO STOCK' END AS stock_reason FROM kd2_sales AS SA INNER JOIN kd2_stock_inventory AS SI ON SA.product_id = SI.product_id EMIT CHANGES;


Let's melt the chocolate - Breaking it down

We are using the CASE WHEN clause to set the stock_value as well as the stock_reasons for the kd2_stock_updates stream. We are joining to the kd2_stock_inventory table to get the current stock levels.

If the available_stock is more than or equal to the transaction_amount, we allow the sale, setting the stock_reason to "SALE COMPLETED" and the stock_value to negative the transaction_amount. We do this because the SUM calculating our kd2_stock_inventory will now subtract the passed value.

If the available_stock is less than the transaction_amount, we do not allow the sale, setting the stock_reason to "FAILED - NO STOCK" and the stock_value to zero. This leaves the SUM for kd2_stock_inventory in tact.

Time to test

Open up another KSQLDB prompt and enter the following to watch your stock levels in real-time. We will refer to this window as the INVENTORY_WATCH:

SELECT * FROM kd2_stock_inventory emit changes;

Open up yet another KSQLDB prompt (yes, you should have three ksqldb prompts open now) and enter the following to receive the kd2_stock_updates stream events. We will refer to this window as the STOCK_UPDATES_WATCH:

SELECT * FROM kd2_stock_updates emit changes;

Now in your original KSQLDB prompt, perform the following actions and keep an eye out for changes in your INVENTORY_WATCH...

First let us bakes some donuts:

INSERT INTO kd2_baked_donuts (product_id, manufactured_date) VALUES ('CHOC','2019-11-23T10:10:10.001');
INSERT INTO kd2_baked_donuts (product_id, manufactured_date) VALUES ('CHOC','2019-11-23T10:10:10.002');
INSERT INTO kd2_baked_donuts (product_id, manufactured_date) VALUES ('CHOC','2019-11-23T10:10:10.003');
INSERT INTO kd2_baked_donuts (product_id, manufactured_date) VALUES ('CHOC','2019-11-23T10:10:10.004');
INSERT INTO kd2_baked_donuts (product_id, manufactured_date) VALUES ('CHOC','2019-11-23T10:10:10.005');

Notice how the stock is updating in the inventory in your INVENTORY_WATCH:

Now let's buy a single donut, in your KSQLDB prompt, knowing that stock is available:

INSERT INTO kd2_sales (product_id, transaction_date, transaction_amount) VALUES ('CHOC', '2019-11-23T10:11:10.001', 1);

If you look at your INVENTORY_WATCH - your stock has been reduced by one donut! Perfect!

Looking at your STOCK_UPDATES_WATCH you will see that the sale was a success. You have received an event, an event that can be used to feed multiple other systems to kick-off promotions, do KYC checks and so forth.

Now we will attempt to buy more than the available amount of CHOC donuts left, 120 to be exact:

INSERT INTO kd2_sales (product_id, transaction_date, transaction_amount) VALUES ('CHOC', '2019-11-23T10:11:10.002', 120);

If you look at your INVENTORY_WATCH now, you will see that a new update was created. The event (attempted sale) was acknowledged, but it did not change the state of the inventory.

Furthermore, glancing over at your STOCK_UPDATES_WATCH you will see that you have received an event for your failed sale.

It's referable, actionable and traceable. Even though it did not make you money or affect your stock, it is a valuable business event as it can provide insight into your failed sales.

Acknowledging our failures - Bake more donuts

It might seem a bit odd to see a failure as part of a business flow. We are so used to a transaction encapsulating the stock check and updates, and historically we treat the failure as a failure.

But a race condition to buy the last donuts is actually not a failure. It's a normal and known time bound race condition where the winner buys the last donuts, and the losers are sad, because donuts are so delicious!

In event streaming we should treat this as a first class citizen, because it empowers us to build a better business. Having a business stream of failed sales means we can make that part of our data analysis. We can add the transaction amount in and see if there's any correlation to amount ordered and failure count.

Just looking at that we might find that we have orders every single day that fail because the order amounts are more than we ever have in stock for a specific product. Knowing that, we can engage with customers, bake more of that product and so open up a new sales quadrant.

Closing Shop - Summary

In this short post we looked at how we could use the CASE and JOIN clauses to build a validated working inventory. We also discussed how we should embrace making more historically ignored events or failures, part of the core business data flow, as they might have very important benefits for our business.


Kafka Donuts - on KSQLDB - #2
Share this