Sensible Schema Evolution


Schema evolution is a critical part of building a system with Kafka, but it's often overlooked. Lately I've been thinking a lot about best practices to facilitate our development with Kafka—this article is the result of all of that.

Compatibility

At its simplest, compatibility is two things being able to work together. We often separate types of compatibility based on time—backwards compatibility is when newer code can read data that was written by older code.

This means as we change our consumer applications they're still able to read data written months ago. Because we have things like compacted topics and long time retention, our consumer applications must be backwards compatible.

Forwards compatibility, on the other hand, is when older code can read data that was written by newer code [1]. This means we can edit our producer applications and have older consumers still functioning seamlessly. This is important for building new features.

While this is a useful distinction, in most Kafka systems, we need it to be both kinds of compatible. We need to build new features and have consumer applications that will continue to work with long-lived data.

Maintaining Compatibility in Practice

If you guys recall, there are two interfaces we can use to consume Avro data in Kafka—SpecificRecord and GenericRecord. When we use SpecificRecord, Avro does something interesting. It deserializes the message using the schema it was written with (as always), but during that process it resolves the record to the schema specified in our generated class. (You can read more about how this works in my last article).

We call the two schemas it uses in this process the writer's schema (the one used to write the data), and the reader's schema (the final deserialization target). In the Kafka world, the writer's schema is created from the producer when the message is produced, and the reader's schema lives in the consumer.

What's particularly interesting about schema resolution, is that because Avro offers it as a feature, they've spent tons of time thinking about what makes two schemas compatible or not, and if they're different, how to go from one to the other. They've basically solved the set of rules that we need to evolve data over time. Why is nobody talking about this?!

Quick Note on Avro IDL

In this post, we're going to be using Avro's Interface Definition Language (IDL). It's a simple and exact way to talk about schemas, without looking at the more network-friendly JSON format, or using some derived method like a programming language class. This is about all the intro you need:

// define an record
record Employee {
  union { null, string } name; // union type with null means nullable
  boolean active = true;       // true is the default value
  long salary;
}

Now let's take a look at any schema changes you'd want to make and how they affect the compatibility of the system.

Symbol Meaning
Totally fine
🚧 Proceed with caution
It will break!

✅ Add a field to the writer's schema

// Writer's schema
record Task {
+   string id;   // adding this field
    union { null, string } name;
}

// Reader's schema
record Task {
    union { null, string } name;
}

If the writer's schema contains a field with a name not present in the reader's record, the writer's value for that field is ignored. - Avro spec

However, you should still be careful about adding fields to writers—because removing them is trickier.

🚧 Remove a field from the writer's schema

This depends on the reader's schema. All readers must either not contain the field, or if they do, have a default value specified. In fact, let's define that.

In Avro, a field is removable (from a writer) when each relevant reader schema has a default value specified for that field, or doesn't contain it at all.

And a field being nullable does not mean it has a default value.

// Writer's schema
record Task {
-   union { null, string } id; // removing this field
    union { null, string } name;
}

// Reader A ✅ has a default value
record Task {
    union { null, string } id = null;
    union { null, string } name;
}

// Reader B ✅ doesn't have that field
record Task {
    union { null, string } name;
}

// Reader C ❌ has the field, and no default value!
record Task {
    union { null, string } id;
    union { null, string } name;
}

Note that in Avro if the type is a union, the type of the default value must match the first type in the union.

🚧 Change a field type

In Avro there's the idea of promotion, where one data type can be promoted to another. Let's take a look at the Avro primitive types.

Primitive Type Description Promotable to
null no value
boolean a binary value
int 32-bit signed integer long, float, double
long 64-bit signed integer float, double
float single precision (32-bit) IEEE 754 floating-point number double
double double precision (32-bit) IEEE 754 floating-point number
bytes sequence of 8-bit unsigned bytes string
string unicode character sequence bytes

But how does that tell us if we can change a field type? The constraint we have to follow is the following:

The type of the writer's schema must be promotable to the type in the reader's schema.

Here's an example:

// Writer's schema
record Task {
    string id;
-   string name;
+   bytes name;
}

// Reader's schema ✅ bytes can be promoted to string
record Task {
    string id;
    string name;
}

❌ Change a field name

Changing a field's name is equivalent to adding a new field (which is fine), and removing one (which you need to have caution doing). It's possible, but it's a bad idea.

What makes this an especially bad idea is that there is likely old data with the old field name, so your consumers will have to support two fields for the benefit of one. However, you do always have the option to change something just before it gets presented to your users.

Changing the reader's schema

Making changes in the reader's schema (the consumer application) instead of the writer's schema (the producer application) is simply the reverse of some writer action, in terms of compatibility.

They're different actions, but they can result in the same combination of compatibility dilemmas.

Reader Action Writer Action Allowable?
Adding Field to Reader Removing Field from Writer 🚧
Removing Field from Reader Adding Field to Writer
Changing Data Type Changing Data Type 🚧
Changing Field Name Changing Field Name

Best Practices

Using the above rules, we can generate some best practices for how to move fast without breaking things, in our Kafka-based system.

Use default values in readers — whenever you can.

This gives you a leg up in forwards compatibility. It allows you to remove fields from producers, and the consumer applications will keep functioning until you get around to removing that field from them as well.

This actually follows one of my favorite software engineering concepts, Postel's Law:

Be liberal in what you accept, conservative in what you send.

Don't change field names

It's possible, but it's not worth it.

Change data types with care

Use that table as your reference when changing. And remember Postel's law—if you have a writer's schema that's been at times an int and a long, putting double in the reader's schema would be a good idea.

Questions I Had While Writing This

If a field is nullable, do I have to specify a default value?

Yes, you do. This goes back to removability of a field. In Avro, however, there is a difference between a record without a value, and a record with that value set to null.

For example, here's a failing test.

package io.atomiccommits.avro

import com.sksamuel.avro4k.Avro
import com.sksamuel.avro4k.AvroName
import kotlinx.serialization.Serializable

import io.kotlintest.shouldBe
import io.kotlintest.specs.FunSpec

import org.apache.avro.SchemaCompatibility
import org.apache.avro.SchemaCompatibility.SchemaCompatibilityResult.compatible
import org.apache.avro.reflect.Nullable

class AvroTests: FunSpec() {
   init {
      test("Is nullable a sufficient default value?") {
         @AvroName("Task")
         @Serializable
         data class Writer(val name: String?)

         @AvroName("Task")
         @Serializable
         data class Reader(val id: String?, val name: String?)

         val writerSchema = Avro.default.schema(Writer.serializer())
         val readerSchema = Avro.default.schema(Reader.serializer())

         val pairCompatibility = SchemaCompatibility.checkReaderWriterCompatibility(
            readerSchema,
            writerSchema
         )

         pairCompatibility.result shouldBe compatible()
      }
   }
}

To make them compatible, we can add a default value to the reader's schema.

@AvroName("Task")
@Serializable
data class Reader(@AvroDefault(Avro.NULL) val id: String?, val name: String?)

Changing record types

Resolution of record types (think "objects") is done recursively, matching fields by names. This means you can't change the name of the record type without wreaking havoc, but you can edit it in a permissible way (e.g. add a field to it).

Conclusion

Maintaining and building upon a Kafka system is difficult. Schema evolution is one of those communication type things that if we do well, we'll never notice, but if we don't, it will make progress more painful. Fortunately, Avro does a lot for us, and we should take full advantage of that.

If you've got any suggestions as to how I can improve this article, or another best practice, feel free to email me.

Sources

  1. Designing Data Intensive Applications
  2. Avro Specification