We're planting a tree for every job application! Click here to learn more

Deduplicating high-throughput data-streams with Bloom Filters

Cristian Castiblanco

11 Jun 2018

•

7 min read

Deduplicating high-throughput data-streams with Bloom Filters
  • Data Engineering

TLDR: we run several applications consuming multiple real-time streams. These applications implement idempotence using Redis sets. In this post I describe our journey moving to bloom filters (using the ReBloom module), which brought down our memory usage by almost 10x.

Let's briefly define idempotence:

An operation is said to be idempotent when applying it multiple times has the same effect.

There are some reasons why applications consuming a real-time stream must have an idempotence layer, e.g.:

  • If the application crashes or restarts (e.g., after a deploy), it might need to read parts of the stream that were processed already.
  • If the producer sends duplicated events into the stream (e.g., because of connectivity issues), the consumer should process them only once.

Why redis?

Streaming systems support parallel reads by multiple consumers of the same application by partitioning the stream into shards. This allows to distribute the work between multiple workers, so there must be a shared data store to keep track of the progress. Redis is a good candidate.

It is also necessary for the applications to survive from both software and hardware crashes (e.g., the application, and the hardware where it is running, are assumed to be ephemeral), so implementing idempotence in-memory or storing data in locally is not an option.

A naïve approach to idempotence

The simplest way to provide an idempotence layer is to save all elements observed into a set. Before processing an event, one would check whether it is already there.

This approach is obviously memory intensive and will break if the amount of data is significant enough (say a couple of million events per hour).

Our not-so-naïve approach

Before diving into the meat of this post, let's briefly see what our current implementation looks like, which we have been using for years and that we are replacing with bloom filters.

Every event in the stream is associated with an ID, which we rehash using MD5 to shorten it. Then, we check/add (SISMEMBER / SADD) the ID in a set whose key name is derived from the current day + the initial two bytes of the MD5-hashed ID (i.e., every day of idempotence uses 65536 sets). We then save the ID (minus its first two bytes) as raw bytes.

This is a nice trick that reduces storage usage a little bit: since all keys starting with bytes XX go into the same set, we can drop that part when inserting them. Think of a single-level trie.

We also keep around the last day of idempotence for 24 hours (using EXPIRE), and when checking for nullipotent elements, we do so on both days.

Enter bloom filters

A bloom filter is a space-efficient probabilistic data structure that allows you to check whether an item is a member of a set. There is a catch, though: a query returns either "definitely not in the set" or "possibly in the set" (i.e., false positives).

Another caveat is that you can't list the members of a set (like you would do in a redis set using SMEMBERS), because the items themselves are not saved, just their fingerprint. This is what makes them space-efficient.

bloomd: our first attempt at using bloom filters

We had already used bloom filters before to deduplicate events for our real-time dashboards. We tried implementing them using Lua on Redis, but the performance was not production-grade. So we turned our attention to bloomd, a bloom filters server written in C.

We used bloomd for a while, and its performance is OK. But it has some drawbacks:

  • It does not support clustering. If you want to distribute the load between different nodes, you have to provision them manually.
  • Lack of High Availability: if your cluster dies you are done. There is no built-in data redundancy, and the underlaying data is easy to corrupt.
  • You have to maintain it yourself: we try to align with the "run less software" philosophy, so having yet another service to maintain, that is not directly part of our business, is a bummer.
  • The API is not the most flexible.
  • Almost abandonware: the author is happy with the current set of capabilities, and it's unlikely that this will evolve into something better.

ReBloom: bloom filters as a Redis module

In 2016 Salvatore announced the Redis Modules API at RedisConf; I remember fantasizing about implementing bloom filters as a module. But bloomd was already working, and the modules API still felt experimental, so we waited.

A few months ago we came to know about ReBloom, a Redis module that provides two probabilistic datatypes: scalable bloom filters and cuckoo filters. It is also developed by Redis Labs, our Redis provider, which is encouraging.

ReBloom brings to the table several benefits over bloomd. Since it sits on top of Redis, there are lots of things that come for free:

  • Clustering
  • Redundancy
  • Lower cognitive overhead (we use Redis extensively already)
  • A more powerful API
  • No need to maintain it, as we delegate that task to Redis Labs

Here's an example of the API usage:

# Assuming you are running a redis server with rebloom loaded...

# Use BF.RESERVE to create a filter with the specified
# error rate probability and capacity
> BF.RESERVE your_filter 0.00001 50000000
OK

# Use BF.ADD to add elements to the filter
> BF.ADD your_filter foo

# Use BF.EXISTS to get elements from a filter
> BF.EXISTS your_filter foo
1
> BF.EXISTS your_filter bar
0

It is also possible to add or check multiple items in bulk. And using the BF.INSERT command you can also insert and specify the error rate/capacity to use if the filter is not created yet.

Memory usage comparison

Our current ReBloom setup, which came to be after lots of manual tuning and analysis, is almost ten times more space-efficient than using sets at peak time.

comparison image

The first thing that stands out is that the space taken by the bloom filters is constant. The reason behind this is that the space used by a bloom filter depends on its initial configuration (capacity and error rate), and it is allocated at the moment of creation.

ReBloom implements Scalable Bloom Filters, which means that once the maximum capacity is reached, a new filter is created on top of the original one, with higher capacity and error rate.

In the graph above there was no scaling, and it is not a coincidence: we tuned the configuration to prevent the filter to scale as this has two undesirable effects:

  • Scaling increases the size of the filter, usually, by a factor of two.
  • When testing for membership of an element, ReBloom has to check all layers in the scaled filter, which increases latency.

Tuning the initial configuration of your filters is paramount to get the most out of them. It requires lots of experimentation and understanding of your data.

Tuning ReBloom initial parameters

The parameters with which new filters are created, capacity and error rate, determine their initial size and are immutable.

If you are too conservative, your filter will probably scale multiple times, and every time it will become twice as big, i.e., it grows exponentially. On the other hand, if you are too generous, odds are you will waste space unnecessarily, making this whole journey pointless.

To make things more complicated, it's not easy to predict what the initial size of the bloom filter will be. Here are some examples:

# creating filter with capacity of 25M and 1-e10 error rate
> BF.RESERVE foo 0.0000000001 25000000
OK
# let's inspect what we actually got...
> BF.DEBUG foo
1) "size:0"
2) "bytes:268435456 bits:2147483648 hashes:34 capacity:44808984 size:0 ratio:1e-10"

Interesting: the capacity is actually 44808984, way more than we wanted! What's going on? Well, it has to do with the math used to reserve the filter.

Let's try a different one, e.g.:

# creating filter with capacity of 22M and 1-e10 error rate
> BF.RESERVE bar 0.0000000001 22404492
OK
> BF.DEBUG bar
1) "size:0"
2) "bytes:134217728 bits:1073741824 hashes:34 capacity:22404492 size:0 ratio:1e-10"

That looks more like what we want. Let's increase the error rate by 10:

# creating filter with capacity of 22M and 1-e09 error rate
> BF.RESERVE qux 0.000000001 22404492
OK
> BF.DEBUG qux
1) "size:0"
2) "bytes:134217728 bits:1073741824 hashes:30 capacity:24893880 size:0 ratio:1e-09"

Even more interesting: the size in bytes is identical to the previous example, even though we decreased the error rate. There is a difference though, our filter has a more generous capacity (it went from 22404492 to 24893880), and it uses fewer hashes. Again, this is due to the math used by the current implementation.

Since changes to the initial capacity and error rate influence the resulting size so dramatically, I plotted it for different parameters in an interactive notebook, which you will find handy should you need to tune your filters:

notebook

The elephant in the room

Something to keep in mind is that you might lose data to false positives. This is a very important trade-off you need to make when using bloom filters, which also limits the scenarios in which is feasible to use them.

If every event in your pipeline is critical, and losing even one would impact business, then bloom filters are not for you.

Most often than not, though, pipelines contain all kinds of events, with different degrees of importance. And in some applications, dropping events are not that of a big deal (e.g., an app feeding a real-time dashboard: if you are summing large amounts of events, dropping one or two would be unnoticeable).

There are several ways to avoid this, though. For instance, you could use bloom filters with different configuration depending on the kind of event passing through it: high-priority events could have a minimal error probability (or even use plain old sets instead of a probabilistic data structure), while those that are less important could use a more forgiving configuration.

Additional resources

Did you like this article?

Cristian Castiblanco

I am a software engineer with 10+ years of experience. I've worked with several stacks but specialize in JVM technologies (Java, Scala, Kotlin, etc.), and recently I've been working in data/backend engineering using Hadoop tech (Spark, Hive, Impala, etc.) as well as Apache Flink. I write clean code promptly, test my work and refactor mercilessly. I love to learn new techniques and technologies, and also like to teach, and can explain things clearly. I pick up new things quickly and bring passion to whatever I do. I dislike the "full stack engineer" term, but I have been all over the place. I have years of experience doing Android development, I have extensively worked in backend development (building systems and API servers, and managing databases) as well as collaborated in the writing of the frontend (Angular and React.js) systems that consume them. Most of the development teams I've worked for own their deployments, so I've come to learn devops too (Jenkins, Spinnaker, Kubernetes, etc.).

See other articles by Cristian

Related jobs

See all

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Related articles

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

•

12 Sep 2021

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

•

12 Sep 2021

WorksHub

CareersCompaniesSitemapFunctional WorksBlockchain WorksJavaScript WorksAI WorksGolang WorksJava WorksPython WorksRemote Works
hello@works-hub.com

Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ

108 E 16th Street, New York, NY 10003

Subscribe to our newsletter

Join over 111,000 others and get access to exclusive content, job opportunities and more!

© 2024 WorksHub

Privacy PolicyDeveloped by WorksHub