Utilizing Kafka Streams’ Processor API and implementing custom aggregator
Hello, today I’m going to talk about this pretty complex topic of “Apache Kafka Stream’s Processor API” (https://docs.confluent.io/current/streams/developer-guide/processor-api.html)
Developers refer to the processor API when Apache Kafka Streams’ toolbox doesn’t have a right tool for their needs OR they need better control over their data. Kafka Streams is a relatively young project that lacks many features that, for example, already exist in Apache Storm (not directly comparable, but oh well). That’s why I also became a contributor to Kafka Streams to help other maintainers in advancing this amazing piece of software.
Personally, I got to the processor API when I needed a custom count based aggregation. I also didn’t like the fact that Kafka Streams would create many internal topics that I didn’t really need and that were always empty (possibly happened due to my silliness). And I really liked the processor API! I was deciding how and what goes to internal topic(s), and I had better control over my data overall.
But, let’s get started. I’ll be building my custom kafka streams aggregator using Processor API on top of Spring Framework with Spring Cloud (why? Because I can!). It will be beneficial to both, people who work with Kafka Streams, and people who are integrating Kafka Streams with their Spring applications.
Step 1: Configuring Kafka Streams within Spring application
Here is the list of our gradle dependencies (I uploaded a completely working project to my Github, the link is posted at the end of this article):
Once all dependencies are imported. Let’s create a message binding interface:
Then assuming that you have Kafka broker running under localhost:9092
. Define following properties under application.properties
:
Should be pretty self-descriptive, but let me explain the main parts:
custom-listener
is an application-id of your kafka streams listener, very similar togroup-id
. Essentially, used when you have multiple instances of your application and the incoming messages needs to be received only once by one of the instances (also pretty complex topic lookupStreamPartitioner
and Kafka’sgroupId
)input-topic
is a kafka topic from which we will be receiving our messages.
Let’s enable binding and create a simple stream listener that would print incoming messages:
So far, so good! Now you can start our application, send some messages, and you should see that the messages are being received by our Kafka Streams listener.
Step 2: Create a custom Transformer using Processor API
You can create both stateless or stateful transformers. Here is the difference between them using a simple language. Stateless transformers don’t leave any memory or network footprints on broker’s side, the transformation happens on the client side i.e. data is not sent (roundtrip’ed)to any internal Kafka topic.
Stateless transformations are used to modify data like map or filter-out some values from a stream. Here is a caveat that you might understand only after working with Kafka Streams for a while. Since it is a stateless transformation, it will live on a receiver’s instance i.e. if the instance goes down, it will not get rebalanced among other listening instances from the same group, only the original data (pre-transform) will. It deserves a whole new article, also pretty complex and interesting topic.
Stateful transformations, on the other hand, perform a round-trip to kafka broker(s) to persist data transformations as they flow. It has its pros and cons.
Pros: when you make a continuous transformation and your instance(s) goes down, other instance (or after a restart) will pick up the work where it got left off. Moreover, you can distribute (balance) the transformation work among instances to reduce the workload.
Cons: you will have to sacrifice some space on kafka broker’s side and some networking traffic. It becomes a problem when you have considerable amount of data floating through your transformer with not big enough Kafka cluster.
Today, we will implement a stateful transformer, so we could utilize as much available features as possible. Let’s create a custom, stateful transformer that would aggregate certain letters, and as soon as it reaches a certain cap, it will “flush” aggregated values down the stream.
For example, if we receive 4 messages like aaabbb
, bbbccc
, bbbccc
, cccaaa
with a cap set to 7.
It will aggregate them as a:6
, b:9
, c:9
, then since b
and c
reached the cap, it will flush them down the stream from our transformer. Notice that we will flush only two records b:9
and c:9
while record a:6
would be still sitting in the state store of our transformer until more messages arrive.
You are probably wondering where does the data “sit” and what is a state store. The state store is a simple key-value store that uses RocksDB which also (by default) persists data in an “internal” kafka topic. Meaning, if you restart your application, it will re-read the topic from the beginning, and re-populate the state store (there are certain techniques that could help to optimize this process, but it is outside of the scope of this article), then it keeps both the state store and the kafka topic in sync.
Let’s create a class CustomProcessor
that will implement a Transformer<K, V, R>
where K
and V
are your input key-value entries, and R
is a result of your transformer. In our case, we will do the following:
It will ask you to implement 3 methods from Transformer
interface:
We should implement init(ProcessorContext context)
and keep context
, furthermore we should also get a state store out of it.
The state store will be created before we initialize our CustomProcessor
, all we need is to pass stateStoreName
inside it during initialization (more about it later). Let’s also pass our countercap
while we are at it:
The transform method will be receiving key-value pairs that we will need to aggregate (in our case value will be messages from the earlier example aaabbb
, bbbccc
, bbbccc
, cccaaa
):
We will have to split them into characters (unfortunately there is no character
(de)serializer, so I have to store them as one character strings), aggregate them, and put them into a state store:
Pretty simple, right? It could also be improved to have an intermediate Map
that would accumulate characters, and then, once a loop is finished, dump the values into kvStore
, but for now I’ll go a simpler way.
We should also implement a logic for reaching the cap and flushing the changes. Let’s add another method called findAndFlushCandidates
:
When we call findAndFlushCandidates
, it will iterate over our state store, check if the cap for a pair is reached, flush the pair using this.context.forward(key, value)
call, and delete the pair from the state store.
We need to simply call this function in our transform
method right after the loop is done:
You are probably wondering why transform
returns null
. Well, I didn’t tell you a whole story. You can flush key-value pairs in two ways: by using previously mentioned this.context.forward(key, value)
call or by returning the pair in transform
method. When we return null
in the method, nothing gets flushed.
Seems like we are done with our CustomProcessor
(Github link to the repo is at the end of this article).
Step 3: Create a state store by intercepting a streams builder
In order to make our CustomProcessor
to work, we need to pre-create our state store. It is a little tricky right now in Spring Framework (and I hope they improve it later, but here is what I came up with).
Since stream building is happening under Spring’s hood, we need to intercept it in order to create our state store:
- We need to autowire
ApplicationContext
to interceptStreamBuilder
, also let’s hardcode ourstateStoreName
and ourcap
as well:
2. Let’s define a method initializeStateStores
where we will intercept the builder, and create our desired state store:
Woah, woah, let’s slow down! I’ll explain what we are doing line by line:
Line 1: Get StreamBuilderFactoryBean
using ApplicationContext
based by a name. You probably noticed a weird name here &stream-builder-requestListener
. Surprisingly, it comes from the name of our method annotated with @StreamListener
i.e. &stream-builder-${stream-listener-method-name}
:
More about this at https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.2.0.RC1/spring-cloud-stream-binder-kafka.html#_accessing_the_underlying_kafkastreams_object
Line 2: Get actual StreamBuilder
from our factory bean
Line 3: Create StoreBuilder
that builds KeyValueStore
with String serde defined for its key, and Long serde defined for its value
Line 4: Add our newly created StoreBuilder
to StreamBuilder
Done, Phew!
3. Call initializeStateStores
method from our requestListener
:
Step 4: Initialize CustomProcessor and integrate it into KStream flow
We need to initialize our CustomProcessor
in KStream
. We call transform
method on KStream
, then we initialize CustomProcessor
in there.
It should look something like this:
We need to provide stateStoreName
to our CustomProcessor
, and also to transform
method call. Otherwise, it will throw something along the lines with:
Processor KSTREAM-TRANSFORM-... has no access to StateStore counterKeyValueStore as the store is not connected to the processor...
Ooof. I think we are done here! If you start the application, everything should boot up correctly with no errors. But wait! We also want to test it, right?
Step 5 (optional): Create KafkaProducer to send sample messages to our Kafka Streams listener
This should be pretty simple. Let’s define CommandLineRunner
where we will initialize simple KafkaProducer
and send some messages to our Kafka Streams listener:
Then, if you start your application, you should see the following logs in your console:
<date-time> INFO 51760 --- [-StreamThread-1] c.p.DemoApplication$KafkaStreamConsumer : b:9
<date-time> INFO 51760 --- [-StreamThread-1] c.p.DemoApplication$KafkaStreamConsumer : c:9
As expected, it aggregated and flushed characters b
and c
while a:6
is waiting in the state store for more messages.
Feel free to play around with the code, add more payloads, modify aggregation logic. After all Processor API is not that scary as it appears to be.
Thank you for reading this article!
I’ll try to post more interesting stuff I’m working on.
Github: https://github.com/yeralin/custom-kafka-streams-transformer-demo