We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel πŸš€.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through Java Works average a 15% increase in salary πŸš€

Blog hero image

Using Monix with Kafka, Avro and Schema Registry

Jendrik Poloczek 20 October, 2018 (2 min read)

Working with Confluent's schema registry solves problems like where to store schemas in a polyglot environment, how to keep track of versions and how to evolve schemas over time. It's also required to have implicit schemas for your Kafka topics when using Kafka connectors or KSQL. If you don't need the former, you might as well just use a type of Array[Byte] and serialize with Avro4s or avrohugger.

So what's Monix? Monix is a high-performance Scala / Scala.js library for composing asynchronous, event-based programs. It's leveraging concepts of Scalaz' task and provides further out-of-the-box compatibility with the Reactive Streams protocol. The developers behind Monix also created monix-kafka, which is a wrapper for Kafka to easily integrate with Monix-based services. In addition, a consumer can also be turned into an Observable, see also this comparison to Akka Actors, Akka Streams and FS2.

Add Dependencies

When using monix-kafka, you instantiate your Kafka producer by passing a configuration and type parameters of how your records look like:

val producerCfg = KafkaProducerConfig.default.copy(bootstrapServers = brokers.toList)
private val producer = KafkaProducer[Array[Byte], Array[Byte]](producerCfg, scheduler)

The type for both key and value is the supported Array[Byte] type. However, let's serialize with Confluent's Avro serializer. For this, first, we need to add the missing link, a new resolver URL and the serializer dependency. Be sure you add the right version depending the schema registry you're running.

resolvers += "confluent.io" at "http://packages.confluent.io/maven/"
libraryDependencies ++= "io.confluent" % "kafka-avro-serializer" % "5.0.0"

Extend Monix Kafka

Thanks to implicits we can easily add it to extend the capabilities of Monix' Kafka producer and consumer. Let's create an Object that contains functions to create implicit MonixSerializer and MonixDeserializer values given a serializer, deserializer configuration and a boolean parameter to indicate whether it is the record key (needed by Confluent's Kafka Avro Serializer). In the configuration we can now pass the schema registry URL.

import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroSerializer}
import monix.kafka.{Serializer => MonixSerializer}
import monix.kafka.{Deserializer => MonixDeserializer}
import collection.JavaConverters._

object AvroSerializer {
  def serializer(cfg: Map[String, String], isKey: Boolean): MonixSerializer[Object] =
    MonixSerializer[Object](
      className = "io.confluent.kafka.serializers.KafkaAvroSerializer",
      classType = classOf[KafkaAvroSerializer],
      constructor = _ => {
        val serializer = new KafkaAvroSerializer()
        serializer.configure(cfg.asJava, isKey)
        serializer
      }
    )

   def deserializer(cfg: Map[String, String], isKey: Boolean): MonixDeserializer[Object] =
    MonixDeserializer[Object](
      className = "io.confluent.kafka.serializers.KafkaAvroDeserializer",
      classType = classOf[KafkaAvroDeserializer],
      constructor = _ => {
        val deserializer = new KafkaAvroDeserializer()
        deserializer.configure(cfg.asJava, isKey)
        deserializer
      }
    )
}

Monix Kafka Producer

Let's use these implicits now to instantiate a Kafka producer which serializes to Confluent's Avro format and uses the schema registry to lookup the schema for a specific topic, great for typesafetyness in Kafka topics. Here's how the rest of the code looks like. It's analogous to use Avro in the key field or instantiating a Kafka consumer.

case class RecordValue(someInt: Int)
val serCfg = Map("schema.registry.url" -> "http://schemaregistry:8081")
implicit val serializer: Serializer[Object] = AvroSerializer.serializer(serCfg, false)
implicit val format = RecordValue[ValueFormat]
val producerCfg = KafkaProducerConfig.default.copy(bootstrapServers = brokers.toList)
val producer = KafkaProducer[String, Object](producerCfg, scheduler)

val recordVal = format.to(RecordValue(1))
val record = new ProducerRecord[String, Object](topic, 0, "key", recordVal)
val task = producer.send(record)

Originally published on www.madewithtea.com