Deep Dive into Joining in Kafka Streams


Joining is one of the most useful capabilities we have when working with Kafka Streams, but it takes some effort to wrap your head around.

I'll start off by saying — streaming applications are not relational databases. As cool as KSQL is, don't get fooled into thinking this is exactly like joining two tables in Postgres. It is, but now time is a factor.

To start off, we're going to review a few non-obvious concepts of Kafka & Kafka Streams.

The stream table duality

This is the realization that a stream and a table are two sides of the same coin. When you have a table, underneath it is the changelog—a stream. Even a normal database like Postgres is implemented with a Write-Ahead Log.

Similarly, if you have a stream, if you want a snapshot of the current state at any given time—that snapshot would be a table.

Within the context of Kafka Streams, this means we can go back and forth between a KStream and a KTable. Because a KStream can represent the changelog on a KTable, and a KTable can be a snapshot of the current state of the stream.

Kafka exploits this internally, and we can too when we build our streaming applications.

Keys & Partitions Matter

If things are set up properly, you can spend a lot of time developing a topology without thinking about the message keys. If things are set up properly...

Until recently (like, very recently) the only way to join streams was based off of message keys. There was even the further requirement that the two input topics have the same keys & the same number of partitions.

It's understandable why you could only join by key. How a join is implemented under the hood is pretty complex, and going on anything other than message key is far more expensive.

Given that we have to join by keys, why would the data have to be co-partitioned?

Well, a KTable is partitioned, meaning it only handles a subset of partitions. (If you want a KTable that handles all of the partitions in a topic, you can use a GlobalKTable). So if a KTable is only getting a subset of partitions, and the destination partition of a message is determined by its key + the number of partitions, then topics have to be equivalently keyed and partitioned in order to join.

Windowing

This is where the idea of time really comes into play, and we start to see how different these operations are from a traditional database join.

Windowing comes off as very simple, but there's 4 different types of windows we can use in a Kafka Streams app.

Name Description Example
Tumbling time window Fixed-size, non-
overlapping windows
Pager Duty. Did a PS ticket occur during
your window?
Hopping time window Fixed-size, overlapping
windows
The S&P 500 is often calculated as a
rolling average to smooth out variability,
that would be done using a hopping time
window.
Sliding time window Fixed-size, overlapping
windows that work on
differences between
record timestamp
These are only used for joins in Kafka
Streams. It slides continuously along the
time axis. If two records are within X
seconds of each other, they can be joined.
Session window Dynamically-sized, non-
overlapping, data-driven
windows
Mostly used for user behaviour analysis.

Windowing will come back up for the KStream <> KStream join, sliding time windowing, in particular.

Inner, Left, and Outer Joins

This is one of those topics that does actually remain the same in between databases and streaming, woohoo! To review:

  • An inner join is a join that only includes records when there is a corresponding record on both sides
  • A left join is a join that includes all records in the left table (or stream), regardless of if they exist on the right.
  • An outer join includes all records that appear in either table, irrespective of whether they exist in the other one.

Now, in order to understand the specific behaviour of the different types of joins in Kafka Streams, we're going to run an experiment.

Experiment

This is somewhat inspired by the Kafka Streams Join Semantics Confluence page. I did this because we had a pretty particular situation at work that wasn't quickly resolved by just looking at that page.

The goal of this experiment was to understand a bit better how the different types of joins act in different situations. We're only going to deal with inner joins due to the number of permutations, but other types of joins can be useful in Kafka Streams, particularly the left join.

Problem & Setup

Imagine we have two topics, left-topic and right-topic. The main scenario we're going to use for testing I'm calling "Many Left Events".

ts left-topic right-topic
1 a
2 b
3 A
4 c
5 d
6 B

I chose this situation for the following reasons:

  1. The left-topic has multiple events occurring before the right-topic has its first event. This is the subtlety that led to this rabbit hole in the first place. As you'll see in the results, it very much affects the behaviour. It's also not addressed in the documentation.
  2. While the left-topic does have a lot of events, I wanted to include two events in right-topic in case there was some kind of multiplicity that occurs. Sneak peek: there is, in a KStream <> KStream join.

Using Strings

A key choice in this experiment is using strings. Integers would've been fine too — I just wanted to avoid anything related to Avro to avoid an unrelated area of complexity. The goal here is to analyze joining in its simplest form.

The Value Joiner

In this experiment, we're going to use the exact same ValueJoiner as the one they used on the Confluence page linked in the intro.

In practice, it looks something like this.

val valueJoiner: (String, String) -> String = { leftValue: String?, rightValue: String? ->
    "$leftValue - $rightValue"
}

valueJoiner("a", "b") // "a - b"

Results

We'll start with the KStream <> KStream join. the main thing you need to remember about a KStream <> KStream join is that it's windowed. For the sake of this experiment, I'm making the window sufficiently large to contain all of these events.

KStream <> KStream Join

ts left-topic right-topic Result
1 a
2 b
3 A a - A
b - A
4 c c - A
5 d d - A
6 B a - B
b - B
c - B
d - B

There are two key things to remember about a KStream <> KStream join

  1. It's windowed. Only events that fall within the same window will ever be joined. For a more thorough overview of window the different types of windows, go here.
  2. Within the window, it acts as a cartesian product.

Quick Review of Cartesian Product

A cartesian product is a set operation. It's most clear through an example. A standard deck of cards is the cross product between two sets:

val values = ["Ace", "King", "Queen", "Jack", "10", "9", "8", "7", "6", "5", "4", "3", "2"]
val suits = ["♠", "♥", "♦", "♣"]

val deck = mutableListOf<String>()

// this is a cartesian product
suits.forEach { suit ->
	values.forEach { value ->
		deck.add("$value of $suit")
	}
}

// [Ace of ♠, King of ♠, Queen of ♠, Jack of ♠, 10 of ♠, 9 of ♠, 8 of ♠, 7 of ♠, 6 of ♠, 5 of ♠, 4 of ♠, 3 of ♠, 2 of ♠, Ace of ♥, King of ♥, Queen of ♥, Jack of ♥, 10 of ♥, 9 of ♥, 8 of ♥, 7 of ♥, 6 of ♥, 5 of ♥, 4 of ♥, 3 of ♥, 2 of ♥, Ace of ♦, King of ♦, Queen of ♦, Jack of ♦, 10 of ♦, 9 of ♦, 8 of ♦, 7 of ♦, 6 of ♦, 5 of ♦, 4 of ♦, 3 of ♦, 2 of ♦, Ace of ♣, King of ♣, Queen of ♣, Jack of ♣, 10 of ♣, 9 of ♣, 8 of ♣, 7 of ♣, 6 of ♣, 5 of ♣, 4 of ♣, 3 of ♣, 2 of ♣]

The fact that a KStream <> KStream join is a cartesian product gives some insight as to why it must be windowed — because for some new matching record, it creates an event for each matching record from the other topic. This means it has to keep all the events during that window in memory. Given an infinite window, that would mean every message ever published to the two input topics.

KTable <> KTable Join

Now let's move on to the other symmetric join type - a KTable <> KTable join. KTable <> KTable joins are designed to be consistent with joins in relational databases.

KTable <> KTable

ts left-topic right-topic Result
1 a
2 b
3 A b - A
4 c c - A
5 d d - A
6 B d - B

Key things to remember about a KTable <> KTable join

  • It's not windowed
  • The result is a KTable, but recall the KStream KTable duality. This means you can just do .toStream() on the result, and you're effectively looking at the changelog stream of the join table.

KStream <> KTable

In a KStream <> KTable join, the result is a KStream. You can think of this almost as if the KStream is data coming from a Kafka topic, and the KTable data just comes from a relational database and enriches the KStream messages. That's not far from what the idea of a KTable is—it's just a materialized table from a topic that lives in an application, not a table in an actual database.

Because of this, the KStream is providing the actual events. The KTable is just their to enrich those events.

KStream <> KTable

ts left-topic right-topic Result
1 a
2 b
3 A
4 c c - A
5 d d - A
6 B

That's probably a lot less resulting events than you imagined. There's two key facts that we can conclude about a KStream <> KTable join.

  1. If a message isn't present in the KTable for a corresponding KStream event, no resulting event occurs.
  2. Changes to the KTable do not fire events in and of themselves.

Now, if it was critical that an event in the KStream caused an event to fire even when a message with that key wasn't present in the KTable, we could use a KStream.leftJoin(KTable).

If we wanted just an event for the last event in the left-topic (that would be b), we could use a KTable.join(KTable).toStream().

Recall that in the changelog of a KTable <> KTable join we get events fired for changes to either KTable, joined with the current state of the other.

What about KStream <> GlobalKTable?

Our situation really isn't designed to test this well. We would just get the same results we got in KStream <> KTable.

This type of join shines in a different set of circumstances, namely, when data isn't co-partitioned or when you when you want to get away from the joining requirements placed on message keys.

So what type of join should I use?

If you're stuck on this problem (like I was), I would encourage you to do the following

Define the common scenarios for your use case

Our scenario was special because it had two messages published to left-topic before any arrived in right-topic, and that caused some behaviour we had to work around, particularly if we wanted to use the KStream <> KTable join.

Define the desired behaviour

I didn't say so at the outset of the experiment, but the behavior I was really looking for was this:

ts left-topic right-topic Result
1 a
2 b
3 A a - A
b - A
4 c c - A
5 d d - A
6 B

Now that I've run all the experiments and analyzed the behaviour of the join types, I see a few problems with this.

I wanted events a - A, b - A to fire, but nothing to happen when d - B fired. That's a contradiction. So in my use case, we had to change the design a little bit to fit into what's possible with a join in Kafka Streams.

Conclusion

Joining streams is harder than joining tables, because they likely weren't produced with joining in mind, and time is involved.

Some of the key concepts are

  • The stream table duality
  • Windowing
  • Inner, left, & outer joins

And if you get stuck in a situation where you're not understanding the behaviour of a given join, or you're not sure which one to use, I suggest you run an experiment similar to the one we did in this article.

P.S. I learned in writing this that the joining of two rivers is called a confluence. Sound familiar?