Distributed Queries in a Microservice Architecture

I wrote this up as an internal document at WeWork, and have found myself referring to it multiple times as I help guide teams to make decisions around this.

A microservice architecture provides great advantages. It provides the foundation for a loosely-coupled architecture that allows teams to work independently and deploy independently. It also allows us to have much more flexibility operationally. Some services can scale significantly while for others you only need a few instances. Some services need significant amounts of memory and others can run very lean.

However, once your systems is decomposed into independent services, a new set of problems arise that need to be addressed.

One of these is the problem of the distributed query. When you have all your data in a single monolithic database, you can use the power of the relational query to pull together information across domains. But once this monolith is split up into microservices, how do you support the need for these queries that span data owned by more than one service?

There are a number of possible approaches that I’ll go through here

Tl;dr

  • Use parallel queries to downstream services and merge in-memory if the merge logic is not too complicated, you can meet your latency requirements, and if you are not dealing with super-large data sets. Reactive libraries and GraphQL services such as Apollo can really help with this. Use non-blocking I/O if your framework and language supports it.
  • If all the above conditions apply, but parallel queries are not possible, use serialized queries
  • If your latency requirements are significant or your data sets are too large, maintain a local copy. Note that switching to a local copy introduces a whole new set of issues and complexity (which I describe towards the end of this article), so avoid jumping to this step unless you have to. For example, see if you can improve the performance of the systems you are calling, or reduce the data sizes you are consuming in a request.
  • If you need to use a local store, rather than write your own query logic, use a storage engine that supports rich queries, such as a relational database or Elasticsearch.
  • If you are using a local copy of data owned by another service, think carefully about correctness, ordering, idempotence, freshness, error scenarios, and how to detect and identify the cause of errors. Make sure there is a way to detect and recover when things get out of sync, and actually test these scenarios.

Serialized multiple queries

With this approach, the primary service queries its local store and serially calls each downstream service to gather the data it needs, and then merges, sorts, filters and aggregates the results as needed. A GraphQL engine like Apollo is becoming a very common way to implement this kind of solution.

Advantages

  • The data from the other services are up-to-date, whereas with a caching solution your data may be stale

Disadvantages

  • Increases query load on downstream services
  • You are essentially building significant components of a database management system (merge/sort/filter/aggregate/group/etc.) in your service
  • Often you will have to load the full result set into memory before you can apply operations such as sort, filter, aggregate. This impacts both memory use and latency and can cause unexpected issues if the data size grows quickly.
  • Direct calls to services increases coupling and introduces points of failure. When there are failures such as a slow or broken network, or if one of the downstream services is slow, not only does this increase the chance of the query failing, but this can cause a cascading failure of all services involved in the request path as resources are blocked up the entire chain.

When to use this

  • Downstream services are reliable and can safely handle increased load
  • The size of the result set is not significant
  • The merge/sort/filter/aggregate operations are not significantly complex

Parallel multiple queries

With this approach we try to address the latency issue by performing the queries in parallel. The ReactiveX programming paradigm makes it fairly easy to do this by modelling it as an Observer over which you can apply a series of functions.

Advantages

  • You are guaranteed the data from the other services is up-to-date
  • If the components involved support it, the ReactiveX model supports a non-blocking I/O model. This means resources such as threads are not locked up waiting for a response if a downstream service is slow. This prevents a cascading failure of the request chain.

Disadvantages

  • You are essentially building significant components of a database management system (merge/sort/filter/aggregate/group/etc.) in your service
  • Generally you can not correctly process the result set until you have all the data, which means you will have to load the full result set into memory rather than provide pagination, increasing the risk of running out of memory
  • Increases query load on downstream services

When to use this

  • The size of the result set is not significant
  • The merge/sort/filter/aggregate operations are not significantly complex
  • Downstream services are reliable and can safely handle increased query load

Query and store

In this model, we get significant gains by copying all the dependent data we need from other services into tables in our main database, and then performing a query in the database.

You can do this using either a synchronous or asynchronous query model.

If at all possible, take advantage of a relational database’s support for temporary tables (for example see this tutorial for MySQL). These tables only last for the lifetime of a transaction, but you can still do full SQL operations against them. Without these you have all sorts of overhead around managing your tables and avoiding conflicts between transactions trying to share the same table.

Advantages

  • Can paginate data rather than having to load it all into memory before returning any results
  • Fairly easy to understand and debug

Disadvantages

  • Increases memory requirements for your database
  • Increased latency due to the additional calls to the database
  • Direct calls to other services increases coupling and introduces points of failure
  • Introduces query load on downstream services

When to use this

  • The size of the result set is significant and/or the merge/sort/filter/aggregate are complex
  • Downstream services can safely handle increased query load
  • Your database can handle the increased write load and memory requirements. Generally this means you don’t have a super-high request load

Read from a distributed cache

Distributed Queries in a Microservices Architecture

Here we address the latency issue by maintaining a local copy of results in a distributed cache like Redis or memcached. You could potentially maintain an cache in each servicer process, but that can really chew up your memory depending on the data sizes you are working with.

The key used to store the data in the cache could be some unique id that was used to acquire the data from the downstream service. Or you can cache the end results of the query if this is doable.

A big aspect of this approach is how you populate the cache and how you keep it fresh. I’ll discuss approaches in the next section, as this is a very important problem to solve and there are a number of possible solutions. For now assume this problem is handled.

With this solution you want to be very selective and thoughtful about how much you store locally. If you store all the data owned by the downstream services in your distributed cache you essentially have ended up with a monolith again.

A very common pattern with queries is that you have a summary query that gets a wide range of matching matching results with summary data, and then there is a detail query which gets the details of a single result. When you receive this detail request, you can directly call the owning service to get the necessary detailed data, rather than caching that as well. Alternately, your client can use the id you return in your search results to call the owning service to get the detailed information,.

Advantages

  • Doesn’t place additional load on your database
  • Reduces load on the downstream services
  • Reduces coupling — if the downstream service is slow or down, in many cases you can still function because of the cache

Disadvantages

  • Need to keep the cache fresh
  • Your microservice is responsible for database functionality like merge/sort/filter
  • The cache may impose significant memory requirements

When to use this

  • You want to reduce coupling in your system
  • Your use case can handle the results not being completely 100% up-to-date
  • You can handle the additional memory requirements

Keep a local copy in your database

This approach is also very similar to the query-and-store approach above, except that you don’t store the results just for the lifetime of the query in a temporary table. Instead, the data is stored permanently in real tables, and generally contains the data needed to service multipie requests, not just the current request.

This is really useful because you can treat the data just as if it were your own data; it is just sourced from another service. You can take full advantage of your local database’s query capabilities, and perform operations like sort, join , aggregate and filter within the database before any data even gets delivered to your service. It also significantly reduces coupling with the downstream services because even if those services are down, you can still operate, albeit with data that is getting stale over time.

Advantages

  • Significantly reduces load on the downstream services
  • Significantly reduces coupling and improves resilience — if the downstream service is slow or down, you can still function because you have a local copy of the data
  • Your microservice does not need to build database management functionality such as sort, filter, merge and aggregate

Disadvantages

  • Increase in storage and memory requirements on your local database
  • Increased load on your local database

When to use this

  • You want to reduce coupling in your system
  • Your use case can handle the results not being completely 100% up-to-date
  • You can handle the additional memory and storage requirements

Cache consistency

Maintaining a local copy of the distributed data needed to support your queries is a very powerful approach. It can significantly improve latency. It also significantly reduces the coupling of your overall system, making your system more resilient and reliable, and improves the independence of your development teams. Also, if you are keeping the copy in your database rather than a cache, it means your service doesn’t have to take on the work that your database can do for you.

Note that you need a very strong data contract to get the loose coupling advantages. If the producer makes incompatible changes to the data schema or data semantics, then your consumption of the data can break. The general recommendation is to use a strong schema contract mechanism like Avro or json-schema. Avro is generally preferred because it comes with compatibility-checks built-in, but json-schema has richer schema validation and can produce very useful validation errors.

Keeping two copies of data consistent is a real challenge. There are two major aspects around this problem: data freshness, and correctly handling error scenarios so that data is not lost or corrupted.

There are three major ways you can manage your cache: lazy loading, batch loading, and streaming.

Lazy loading

  • You have a strong cache hit rate
  • You can depend on the downstream services to be fast and reliable
  • Data freshness is not a major concern

With this approach you start with an empty slate and then on each request you directly query the downstream services and store the results in your local cache or database. This eliminates the need to build a data pipeline to populate your local copy.

Note that this approach doesn’t work if your request patterns have a really poor cache hit rate, This can happen if for example you are supporting searches across the globe. A common resolution for this is to partition your cache based on geo region, so that a request for a certain area always goes to the same partition of the cache.

Another concern with this approach is that you haven’t fully removed the request-time dependency on the downstream service, which increases the overall coupling of your system with the concomitant results, from testing to overall system resilience.

With lazy-loading you have to have a way to refresh the cached data without having a data pipeline that is regularly refreshing it.

One approach is Least-Recently-Used (LRU), where you set a maximum size to the cache, and when you run out of room, you evict the oldest entry. This is generally useful when you have a tight memory constraint and you need to decide which entries need to go while still maintaining a good cache hit rate.

A more common approach when you are worried more about freshness than space is Time to Live (TTL). In this approach, you assign a maximum time a cache entry can be considered valid. On a given request, if an entry is in the cache but too old, it is evicted, and the request goes to the owning service to get fresh data and refresh the cache entry. Note that the shorter the TTL, the more your service degrades to a standard direct request solution.

Error-handling

A very important thing you must do if you’re directly calling another service is to protect yourself and the rest of the system from a cascading failure due to running out of request threads or other resources because they are all blocked waiting for results.

Having a timeout is good but generally not sufficient. The best practice here is to implement a circuit breaker. If you are running in a managed environment like Kubernetes that supports a sidecar like Envoy, your sidecar can implement the circuit breaker for you. Otherwise you need to get a good circuit breaker library for your language and implement this pattern yourself.

Batch Loading

With batch-loading, you have a job that runs on a regular basis, consumes all the data that has changed since the last load, and populates your database with the results. This sounds easy and a lot of people go with this approach, particularly to support data warehouses, but it actually introduces problems, and I usually try to move people away from this approach.

One approach I have seen is to have a job that directly queries the database of the other service, gets the rows that have changed, and then load this into your database. The problem is, you now have a dependency on a private interface (the database schema) of another service. I can guarantee you that you will constantly be surprised by subtle and not-so-subtle breakages of the presumed but not agreed upon contract that this dependency represents.

Another approach is to call an API on the owning service that returns a batch of results based on what’s changed since the last call. This can be very problematic because it places heavy load on the service and its underlying database, and can be quite error-prone. In general HTTP requests are not intended or set up for returning large result sets.

I generally recommend avoiding batch-based loading because of these significant issues. One situation where you have to use batch loading is if you are receiving data from a third-party.

Replication

Another approach that is quite popular is to use database replication. This is in particular a go-to solution for data teams who want to maintain a data warehouse, which is a specialized version of this distributed query problem.

But database replication some significant problems. Now not only are you tied to their schema, but your schema has to match theirs. You also are tied to the requirement that both databases have to be compatible with each other (e.g. they both have to be MySQL). If you or the other team decides you need to move to another storage solution (like ElasticSearch or Cassandra), you are in for a world of hurt. I speak from painful experience.

Publish/Subcribe using an Event Log

If you need to maintain a local copy of another service’s data, the approach I generallay recommend is publish/subcribe using an event log like Kafka. This has a number of advantages:

  • You have a well-defined contract in the form of a schema definition. Just as you create an API contract, you use Avro or json-schema to create an event contract. This contract provides a guarantee that consumers won’t break as the publisher’s data set evolves, similar to backward-compatibility guarantees for APIs.
  • You are not placing direct, unmanageable load on the owning services. They control how and when they make the data available.
  • Having a contract like this decouples the consumer and producer. If the consumer wants to move to a different storage engine or maybe a completely different implementation of the service, it can do this without breaking its consumers. Also, more consumers can “jump on the train” without the producer having to do additional work
  • Publishing to a persistent log rather than using an in-memory message broker allows for more resilience to failure

However, all these advantages come at a cost in terms of correctness and consistency. These costs are often not well considered, and can create real issues that are hard to resolve quickly.

Correctness and Error Handling

Scalability, freshness, availability and durability of the data stream

Kafka, if configured correctly, can be very robust and scalable. These days there are multiple organizations providing Kafka clusters as a service. Having spent way too many hours trying to correctly configure a cluster, I highly recommend this approach. It may look expensive, but don’t be penny wise and pound foolish.

Idempotence and at-least-once semantics

But when you do this, it’s possible the data was already written once. This is called at-least-once semantics. When you have this kind of world, then your consumer needs to be idempotent — it needs to guarantee that if you get the same write multiple times, it’s effectively the same as getting it just once.

Ordering

System performance

If performance is an issue you can look at running the job more frequently so that the delta each time is smaller. As a result it runs faster and places less of a bursty strain on the system.

Error Handling

A very important thing to remember in these scenarios is you do not want to drop the message on the floor when an error occurs. You do not want to lose the data.

A common approach is to write the failed messages to an error topic or a “dead letter queue”, log an error and raise an alert. You need to make sure the error topic or dead letter queue has the capacity to handle a large number of incoming messages all at once or it may run out of space/memory and start dropping messages.

You should also consider how do you handle all these messages that got sent over to the error queue once the problem is resolved. Someone may need to manually patch them, or you just want to replay them. Sometimes you need to selectively find specific messages and then fix them in a specific way. A technique I have seen is to push all the messages into a searchable index like ElasticSearch. Then a support person can work with these messages and get them resolved. Kafka Connect provides a tool that writes directly into ElasticSearch so you don’t need to write any code to make this happen.

Consistency checker

This is usually done by iterating through all ids in the source and ensuring that those ids exist in the target. You do not have to run this in a big batch that slams the system; it can be running at a low rate in the background all the time.

Another approach is to just assume things get out of whack over time, and do a full refresh of all the data on some regular basis. It is preferable that you not have to do that as it is expensive and destructive. But in some cases that is the best approach.

Conclusion

There are many approaches to handle this, and it is worth thinking carefully about which one is the right fit for your situation.

Correctness and error handling are key considerations when we start dealing with distributed data. You want to take great care to think through all the scenarios, and lean on existing patterns and best practices to guide you to solutions that are robust, clear, and easy to maintain.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store