Distributed Queries in a Microservice Architecture

David Van Couvering
17 min readOct 1, 2019

I wrote this up as an internal document at WeWork, and have found myself referring to it multiple times as I help guide teams to make decisions around this.

A microservice architecture provides great advantages. It provides the foundation for a loosely-coupled architecture that allows teams to work independently and deploy independently. It also allows us to have much more flexibility operationally. Some services can scale significantly while for others you only need a few instances. Some services need significant amounts of memory and others can run very lean.

However, once your systems is decomposed into independent services, a new set of problems arise that need to be addressed.

One of these is the problem of the distributed query. When you have all your data in a single monolithic database, you can use the power of the relational query to pull together information across domains. But once this monolith is split up into microservices, how do you support the need for these queries that span data owned by more than one service?

There are a number of possible approaches that I’ll go through here

Tl;dr

Here is a quick summary of the guidance to help you get clarity on which approach to use:

  • Use parallel queries to downstream services and merge in-memory if the merge logic is not too complicated, you can meet your latency requirements, and if you are not dealing with super-large data sets. Reactive libraries and GraphQL services such as Apollo can really help with this. Use non-blocking I/O if your framework and language supports it.
  • If all the above conditions apply, but parallel queries are not possible, use serialized queries
  • If your latency requirements are significant or your data sets are too large, maintain a local copy. Note that switching to a local copy introduces a whole new set of issues and complexity (which I describe towards the end of this article), so avoid jumping to this step unless you have to. For example, see if you can improve the performance of the systems you are calling, or reduce the data sizes you are consuming in a request.
  • If you need to use a local store, rather than write your own query logic, use a storage engine that supports rich queries, such as a relational database or Elasticsearch.
  • If you are using a local copy of data owned by another service, think carefully about correctness, ordering, idempotence, freshness, error scenarios, and how to detect and identify the cause of errors. Make sure there is a way to detect and recover when things get out of sync, and actually test these scenarios.

Serialized multiple queries

With this approach, the primary service queries its local store and serially calls each downstream service to gather the data it needs, and then merges, sorts, filters and aggregates the results as needed. A GraphQL engine like Apollo is becoming a very common way to implement this kind of solution.

Advantages

  • Fairly simple to write and debug
  • The data from the other services are up-to-date, whereas with a caching solution your data may be stale

Disadvantages

  • The combined latency of calling each service may make the overall request too slow
  • Increases query load on downstream services
  • You are essentially building significant components of a database management system (merge/sort/filter/aggregate/group/etc.) in your service
  • Often you will have to load the full result set into memory before you can apply operations such as sort, filter, aggregate. This impacts both memory use and latency and can cause unexpected issues if the data size grows quickly.
  • Direct calls to services increases coupling and introduces points of failure. When there are failures such as a slow or broken network, or if one of the downstream services is slow, not only does this increase the chance of the query failing, but this can cause a cascading failure of all services involved in the request path as resources are blocked up the entire chain.

When to use this

  • You can meet the latency requirements with serialized calls
  • Downstream services are reliable and can safely handle increased load
  • The size of the result set is not significant
  • The merge/sort/filter/aggregate operations are not significantly complex

Parallel multiple queries

With this approach we try to address the latency issue by performing the queries in parallel. The ReactiveX programming paradigm makes it fairly easy to do this by modelling it as an Observer over which you can apply a series of functions.

Advantages

  • Improved performance as queries are not serialized
  • You are guaranteed the data from the other services is up-to-date
  • If the components involved support it, the ReactiveX model supports a non-blocking I/O model. This means resources such as threads are not locked up waiting for a response if a downstream service is slow. This prevents a cascading failure of the request chain.

Disadvantages

  • Debugging is harder because things are happening asynchronously, so tracing the flow of the request can be challenging
  • You are essentially building significant components of a database management system (merge/sort/filter/aggregate/group/etc.) in your service
  • Generally you can not correctly process the result set until you have all the data, which means you will have to load the full result set into memory rather than provide pagination, increasing the risk of running out of memory
  • Increases query load on downstream services

When to use this

  • Your latency requirements can not be met with a synchronous model
  • The size of the result set is not significant
  • The merge/sort/filter/aggregate operations are not significantly complex
  • Downstream services are reliable and can safely handle increased query load

Query and store

In this model, we get significant gains by copying all the dependent data we need from other services into tables in our main database, and then performing a query in the database.

You can do this using either a synchronous or asynchronous query model.

If at all possible, take advantage of a relational database’s support for temporary tables (for example see this tutorial for MySQL). These tables only last for the lifetime of a transaction, but you can still do full SQL operations against them. Without these you have all sorts of overhead around managing your tables and avoiding conflicts between transactions trying to share the same table.

Advantages

  • Can use the database system for what it’s good for, rather than building DBMS logic in the service
  • Can paginate data rather than having to load it all into memory before returning any results
  • Fairly easy to understand and debug

Disadvantages

  • Introduces a high write load on the database, although generally temporary tables are stored in memory
  • Increases memory requirements for your database
  • Increased latency due to the additional calls to the database
  • Direct calls to other services increases coupling and introduces points of failure
  • Introduces query load on downstream services

When to use this

  • You can handle the latency impact of querying the dependent service for every request
  • The size of the result set is significant and/or the merge/sort/filter/aggregate are complex
  • Downstream services can safely handle increased query load
  • Your database can handle the increased write load and memory requirements. Generally this means you don’t have a super-high request load

Read from a distributed cache

Distributed Queries in a Microservices Architecture

Here we address the latency issue by maintaining a local copy of results in a distributed cache like Redis or memcached. You could potentially maintain an cache in each servicer process, but that can really chew up your memory depending on the data sizes you are working with.

The key used to store the data in the cache could be some unique id that was used to acquire the data from the downstream service. Or you can cache the end results of the query if this is doable.

A big aspect of this approach is how you populate the cache and how you keep it fresh. I’ll discuss approaches in the next section, as this is a very important problem to solve and there are a number of possible solutions. For now assume this problem is handled.

With this solution you want to be very selective and thoughtful about how much you store locally. If you store all the data owned by the downstream services in your distributed cache you essentially have ended up with a monolith again.

A very common pattern with queries is that you have a summary query that gets a wide range of matching matching results with summary data, and then there is a detail query which gets the details of a single result. When you receive this detail request, you can directly call the owning service to get the necessary detailed data, rather than caching that as well. Alternately, your client can use the id you return in your search results to call the owning service to get the detailed information,.

Advantages

  • Improves latency
  • Doesn’t place additional load on your database
  • Reduces load on the downstream services
  • Reduces coupling — if the downstream service is slow or down, in many cases you can still function because of the cache

Disadvantages

  • Additional complexity to your system
  • Need to keep the cache fresh
  • Your microservice is responsible for database functionality like merge/sort/filter
  • The cache may impose significant memory requirements

When to use this

  • You have strong latency requirements that can’t be met if you were to call downstream services on each request
  • You want to reduce coupling in your system
  • Your use case can handle the results not being completely 100% up-to-date
  • You can handle the additional memory requirements

Keep a local copy in your database

This approach is essentially the same as the previous one, but rather than placing the data in an unstructured memory-based cache, you are persisting it in your database.

This approach is also very similar to the query-and-store approach above, except that you don’t store the results just for the lifetime of the query in a temporary table. Instead, the data is stored permanently in real tables, and generally contains the data needed to service multipie requests, not just the current request.

This is really useful because you can treat the data just as if it were your own data; it is just sourced from another service. You can take full advantage of your local database’s query capabilities, and perform operations like sort, join , aggregate and filter within the database before any data even gets delivered to your service. It also significantly reduces coupling with the downstream services because even if those services are down, you can still operate, albeit with data that is getting stale over time.

Advantages

  • Improves latency
  • Significantly reduces load on the downstream services
  • Significantly reduces coupling and improves resilience — if the downstream service is slow or down, you can still function because you have a local copy of the data
  • Your microservice does not need to build database management functionality such as sort, filter, merge and aggregate

Disadvantages

  • You have to be very careful how you manage consistency between the owning service and your service (see below for more details)
  • Increase in storage and memory requirements on your local database
  • Increased load on your local database

When to use this

  • You have strong latency requirements that can’t be met with synchronous call
  • You want to reduce coupling in your system
  • Your use case can handle the results not being completely 100% up-to-date
  • You can handle the additional memory and storage requirements

Cache consistency

They say that there are four big problems in computer science: naming things, maintaining cache consistency, and off-by-one errors.

Maintaining a local copy of the distributed data needed to support your queries is a very powerful approach. It can significantly improve latency. It also significantly reduces the coupling of your overall system, making your system more resilient and reliable, and improves the independence of your development teams. Also, if you are keeping the copy in your database rather than a cache, it means your service doesn’t have to take on the work that your database can do for you.

Note that you need a very strong data contract to get the loose coupling advantages. If the producer makes incompatible changes to the data schema or data semantics, then your consumption of the data can break. The general recommendation is to use a strong schema contract mechanism like Avro or json-schema. Avro is generally preferred because it comes with compatibility-checks built-in, but json-schema has richer schema validation and can produce very useful validation errors.

Keeping two copies of data consistent is a real challenge. There are two major aspects around this problem: data freshness, and correctly handling error scenarios so that data is not lost or corrupted.

There are three major ways you can manage your cache: lazy loading, batch loading, and streaming.

Lazy loading

This approach can be good when

  • You have a strong cache hit rate
  • You can depend on the downstream services to be fast and reliable
  • Data freshness is not a major concern

With this approach you start with an empty slate and then on each request you directly query the downstream services and store the results in your local cache or database. This eliminates the need to build a data pipeline to populate your local copy.

Note that this approach doesn’t work if your request patterns have a really poor cache hit rate, This can happen if for example you are supporting searches across the globe. A common resolution for this is to partition your cache based on geo region, so that a request for a certain area always goes to the same partition of the cache.

Another concern with this approach is that you haven’t fully removed the request-time dependency on the downstream service, which increases the overall coupling of your system with the concomitant results, from testing to overall system resilience.

With lazy-loading you have to have a way to refresh the cached data without having a data pipeline that is regularly refreshing it.

One approach is Least-Recently-Used (LRU), where you set a maximum size to the cache, and when you run out of room, you evict the oldest entry. This is generally useful when you have a tight memory constraint and you need to decide which entries need to go while still maintaining a good cache hit rate.

A more common approach when you are worried more about freshness than space is Time to Live (TTL). In this approach, you assign a maximum time a cache entry can be considered valid. On a given request, if an entry is in the cache but too old, it is evicted, and the request goes to the owning service to get fresh data and refresh the cache entry. Note that the shorter the TTL, the more your service degrades to a standard direct request solution.

Error-handling

With lazy-loading, the primary error you have to handle is if the downstream service is not responsive or not available. In this model, you really can’t do anything because the reason you are going to the service is because you don’t have a local copy. If you have data that has passed the TTL, you can fall back to that if this is acceptable

A very important thing you must do if you’re directly calling another service is to protect yourself and the rest of the system from a cascading failure due to running out of request threads or other resources because they are all blocked waiting for results.

Having a timeout is good but generally not sufficient. The best practice here is to implement a circuit breaker. If you are running in a managed environment like Kubernetes that supports a sidecar like Envoy, your sidecar can implement the circuit breaker for you. Otherwise you need to get a good circuit breaker library for your language and implement this pattern yourself.

Batch Loading

With batch-loading, you have a job that runs on a regular basis, consumes all the data that has changed since the last load, and populates your database with the results. This sounds easy and a lot of people go with this approach, particularly to support data warehouses, but it actually introduces problems, and I usually try to move people away from this approach.

One approach I have seen is to have a job that directly queries the database of the other service, gets the rows that have changed, and then load this into your database. The problem is, you now have a dependency on a private interface (the database schema) of another service. I can guarantee you that you will constantly be surprised by subtle and not-so-subtle breakages of the presumed but not agreed upon contract that this dependency represents.

Another approach is to call an API on the owning service that returns a batch of results based on what’s changed since the last call. This can be very problematic because it places heavy load on the service and its underlying database, and can be quite error-prone. In general HTTP requests are not intended or set up for returning large result sets.

I generally recommend avoiding batch-based loading because of these significant issues. One situation where you have to use batch loading is if you are receiving data from a third-party.

Replication

Another approach that is quite popular is to use database replication. This is in particular a go-to solution for data teams who want to maintain a data warehouse, which is a specialized version of this distributed query problem.

But database replication some significant problems. Now not only are you tied to their schema, but your schema has to match theirs. You also are tied to the requirement that both databases have to be compatible with each other (e.g. they both have to be MySQL). If you or the other team decides you need to move to another storage solution (like ElasticSearch or Cassandra), you are in for a world of hurt. I speak from painful experience.

Publish/Subcribe using an Event Log

If you need to maintain a local copy of another service’s data, the approach I generallay recommend is publish/subcribe using an event log like Kafka. This has a number of advantages:

  • You have a well-defined contract in the form of a schema definition. Just as you create an API contract, you use Avro or json-schema to create an event contract. This contract provides a guarantee that consumers won’t break as the publisher’s data set evolves, similar to backward-compatibility guarantees for APIs.
  • You are not placing direct, unmanageable load on the owning services. They control how and when they make the data available.
  • Having a contract like this decouples the consumer and producer. If the consumer wants to move to a different storage engine or maybe a completely different implementation of the service, it can do this without breaking its consumers. Also, more consumers can “jump on the train” without the producer having to do additional work
  • Publishing to a persistent log rather than using an in-memory message broker allows for more resilience to failure

However, all these advantages come at a cost in terms of correctness and consistency. These costs are often not well considered, and can create real issues that are hard to resolve quickly.

Correctness and Error Handling

If we use the publish/subscribe model, then there are a number of scenarios that need to be considered. It is usually a major major problem if data is lost or is inconsistent.

Scalability, freshness, availability and durability of the data stream

The data stream capturing all the updates needs to provide solid availability and durability guarantees. We can’t lose data even if the cluster crashes, the server goes down, the network goes down, and so on. This needs to be true even if there is a major burst of data updates or if the overall average throughput of updates grows over time — the pipeline needs to be resilient to this without losing data or taking hours to process. Note that in many ways data that takes forever to arrive is the same as missing data.

Kafka, if configured correctly, can be very robust and scalable. These days there are multiple organizations providing Kafka clusters as a service. Having spent way too many hours trying to correctly configure a cluster, I highly recommend this approach. It may look expensive, but don’t be penny wise and pound foolish.

Idempotence and at-least-once semantics

There are scenarios where the producer of the data does not know if the write was successful or not (for instance the network hangs or goes down after sending the write but before getting a confirmation). In this case you need to retry until you get a success.

But when you do this, it’s possible the data was already written once. This is called at-least-once semantics. When you have this kind of world, then your consumer needs to be idempotent — it needs to guarantee that if you get the same write multiple times, it’s effectively the same as getting it just once.

Ordering

You need to either not care about the order in which updates are processed, or guarantee that all entries for the same entity are processed in the right order. With Kafka you are guaranteed ordering for a particular partition, so if you use an entity key as the partition key, you can get this guarantee.

System performance

Since are running a job as a batch, this batch process can cause bursts of load onto your system, and this can affect system performance. You should keep a close eye on latency and error rates and be prepared to shut down the job if it’s causing problems.

If performance is an issue you can look at running the job more frequently so that the delta each time is smaller. As a result it runs faster and places less of a bursty strain on the system.

Error Handling

What do you do when you are trying to process data and you hit an error? Maybe the data is bad, or you have a SQL syntax error. Often these errors happen in bursts, and you have a large stream of messages you are not able to process.

A very important thing to remember in these scenarios is you do not want to drop the message on the floor when an error occurs. You do not want to lose the data.

A common approach is to write the failed messages to an error topic or a “dead letter queue”, log an error and raise an alert. You need to make sure the error topic or dead letter queue has the capacity to handle a large number of incoming messages all at once or it may run out of space/memory and start dropping messages.

You should also consider how do you handle all these messages that got sent over to the error queue once the problem is resolved. Someone may need to manually patch them, or you just want to replay them. Sometimes you need to selectively find specific messages and then fix them in a specific way. A technique I have seen is to push all the messages into a searchable index like ElasticSearch. Then a support person can work with these messages and get them resolved. Kafka Connect provides a tool that writes directly into ElasticSearch so you don’t need to write any code to make this happen.

Consistency checker

We all have good intentions, but still something can go wrong and we don’t notice it. It is highly recommended to cover your tracks by having a regular test running in production that validates that all the data in the source system exists in the target system, and raising an alert when there is an error. This way if a deploy, for example, has a bug in it, we find out fairly quickly, rather than waiting to get an irate customer call.

This is usually done by iterating through all ids in the source and ensuring that those ids exist in the target. You do not have to run this in a big batch that slams the system; it can be running at a low rate in the background all the time.

Another approach is to just assume things get out of whack over time, and do a full refresh of all the data on some regular basis. It is preferable that you not have to do that as it is expensive and destructive. But in some cases that is the best approach.

Conclusion

Queries that need data owned by multiple microservices is a fact of life in a distributed microservice architecture.

There are many approaches to handle this, and it is worth thinking carefully about which one is the right fit for your situation.

Correctness and error handling are key considerations when we start dealing with distributed data. You want to take great care to think through all the scenarios, and lean on existing patterns and best practices to guide you to solutions that are robust, clear, and easy to maintain.

--

--