We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel πŸš€.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through Java Works average a 15% increase in salary πŸš€

Blog hero image

Kafka Streams in Scala with Schema Registry

Jendrik Poloczek 20 October, 2018 (3 min read)

Working with Confluent's schema registry solves problems like having type safety Kafka topics, where to store schemas in a polyglot environment, how to keep track of versions and how to evolve schemas over time. Further, you have automatic schemas in Kafka connectors or KSQL. If you don't need the former advantages, you might as well just use a type of Array[Byte] and serialize with Avro4s or avrohugger.

Circe and Avro4s to Create Schemas

In this example, I'll use the new Scala API which was released in Kafka 2.0 with Confluent's schema registry and Avro4s to convert a GenericRecord into a case class. Avro4s is also great to generate your Avro schema in JSON, with a small hack, easy peasy:

import java.io.{File, PrintWriter}
import io.circe.generic.auto._
import io.circe.syntax._
import com.sksamuel.avro4s.AvroSchema
case class Block(hash: String, number: Long)
case class AvroSchemaImport(schema: String)
val inner = AvroSchema[Block].toString(pretty = false)
val schema = AvroSchemaImport(inner).asJson.noSpaces
val writer = new PrintWriter(new File("block.json"))
writer.write(schema)
writer.close()

You can load these using the corresponding API calls into the registry. So, regarding the Kafka Streams example with Avro serialization and schema registry I couldn't find anything on the net when working on this, so I might as well provide an example here.

Add Dependencies

So, let's have a look at the crucial dependencies. In case you want to also create schemas, add "io.circe" %% "circe-generic" % "0.9.3" or newer as well.

resolvers += "confluent.io" at "http://packages.confluent.io/maven/"
libraryDependencies ++= 
    Seq("org.apache.kafka" %% "kafka-streams-scala" % "2.0.0",
        "io.confluent" % "kafka-streams-avro-serde" % "5.0.0",
        "com.sksamuel.avro4s" %% "avro4s-core" % "1.9.0")

Kafka Streams Example (using Scala API in Kafka 2.0)

When I searched on the net for a proper setup of a Kafka Streams application with a schema registry using Avro the Scala way, I couldn't find anything. One quirk integrating the GenericRecord is the need for manually specifiying the implicit Serde[GenericRecord] value. Which is needed for all the implicit conversions which make it possible to write beautifully uncluttered code. Here's the code skeleton most people might want to use in some version or another:

import scala.collection.JavaConverters._
import com.company.Schemas.Block
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}

import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer
import io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream.KStream

object Avro {
  type SchemaRegistryUrl = String
  def stream[T: ToRecord : FromRecord]
    (builder: StreamsBuilder, topic: String)
    (implicit url: SchemaRegistryUrl): KStream[String, T] = {
    val config = Map("schema.registry.url" -> url).asJava
    implicit def stringSerde = Serdes.String()
    implicit def genericAvroSerde = Serdes.serdeFrom({
      val ser = new GenericAvroSerializer()
      ser.configure(config, isSerializerForRecordKeys = false)
      ser
    }, {
      val de = new GenericAvroDeserializer()
      de.configure(config, isDeserializerForRecordKeys = false)
      de
    })
  builder
    .stream[String, GenericRecord](topic)
    .mapValues(v => RecordFormat[T].from(v))
  }
}
object Main extends App with LazyLogging {
  val config = new Properties()
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "application_id")
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092")
  implicit val url: Avro.SchemaRegistryUrl = "http://localhost:8081"
  val builder = new StreamsBuilder()
  val blocks  = Avro.stream[Block](builder, "blocks")
  (new KafkaStreams(builder.build(), config)).start()

  sys.ShutdownHookThread {
    streams.close(10, TimeUnit.SECONDS)
  }
}

Kafka Streams Example (using Lightbend's Scala API)

In the first version of this article I was using Lightbend's Scala API. Matthias Sax of Confluent let me know that it was integrated in Kafka 2.0, so I updated the code (see example above). However, I guess it also makes sense to keep the old version for reference (which might help you in some situations).

import java.util.Properties
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Serdes

import com.company.schemas.Block
import com.lightbend.kafka.scala.streams.StreamsBuilderS
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
object Main extends App with LazyLogging {
  val config = new Properties()
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "application_id")
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092")
  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
      Serdes.String().getClass.getName)
  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
      classOf[GenericAvroSerde].getCanonicalName)
  config.put("schema.registry.url", "http://schemaregistry:8081")

  val builder = new StreamsBuilderS()
  builder
    .stream[String, GenericRecord](topic)
    .mapValues(v => RecordFormat[Block].from(v))

  (new KafkaStreams(builder.build(), config)).start()

  sys.ShutdownHookThread {
    streams.close(10, TimeUnit.SECONDS)
  }
}

And remember, keep the types safe!

Originally published on www.madewithtea.com