Queue-Worker as a Distributed System — Fusion

Godavari Doke

Godavari Doke

CrelioHealth for Diagnostics (formerly LiveHealth)
We believe life is better with real personal connections, caring communities, and a sense of play.

Queue worker implementation that can run and scale as a system


We at LiveHealth recently reached a milestone with one of our core service called Fusion, that helps us execute asynchronous tasks. We crossed 1 Million Event in a single day. We would like to take a brief moment to share our engineering experience that we had while building Fusion.

Monitoring Tool for Quick Overview of the System

LiveHealth uses a job queue system for business logic that is time-consuming to run in the context of a web request. This system is a critical component of our architecture which is used for every cross-server sync, push notification, Email, and SMS based communication. Job execution time can often take up to a couple of seconds. The majority of this can be offloaded from our primary servers to improve latency, performance, throughput & improve reliability with async 3rd party services!

 

Old Job Queue System Architecture

As majority of our products are programmed using Django, we had a traditional implementation of celery (django-celery using rabbitmq as a broker to store and stream the jobs). Initially, this served our needs as this was very easy to use and maintained in the same codebase. Adding new logic for the workers to consume was a very easy process and we could access the Django Models in the same scope of its execution. This would allow us to reuse most of our abstractions.

Once we hit the point of friction when a lot of new clients started coming onboard — Django-celery implementation was not solving our requirements anymore and there were some limitations with the system.

Some of the constraints we identified were:

  • Django-Celery is very tightly coupled with the Django application and hence both have to be deployed on the same system — sharing the same resources like CPU & memory, which wasn’t always efficient when auto-scaling servers.
  • With the increase in the number of servers, the number of celery-workers increased as well. And at one point it became really difficult to fine-tune the worker configuration on each server and especially at the time of automating the upscaling and downscaling processes of the servers.
  • When the instances were terminated, any on-going task was interrupted and that had to be handled gracefully, was an overhead.

Our servers were a fleet of c4-large which had 2 Cores and 4 GB RAM each. We had Apache2 that consumed ~800MB to 1.4GB and 2 GB RAM was consumed by RabbitMQ, Celery Worker and Celery Beat together.

Increasing the number of workers and having no easy way to manage them was alarming enough to explore ways to decouple both the systems (Django and Queue Worker) on the whole.

 

Python RQ (Redis Queue)

https://python-rq.org/

Redis has always played an integral role in our overall system architecture and the flexibility that comes bundled with Redis has always been quite appreciable. Python being the primary language of choice — PythonRQ became the default preference as it is lightweight. As their documentation hints, PythonRQ is designed to be simple and uses Redis lists to enqueue tasks.

Flow of Redis Task Queue.

Lifecycle

  • While enqueuing a job, web application creates a unique identifier based on the job meta and job type.
  • Enqueue handler ensures that the job is queued in the data structure in Redis based on the configuration with no duplication — the request gets discarded when a job with the requested identifier is found.
  • Collection of worker polls Redis cluster on the predefined intervals. When the job is found, it is removed from the pending queue before its execution. This ensures that the same job does not surface multiple times.
  • In case of failure, the job is transferred to a permanent failed queue where it can be debugged manually for identifying the root cause of its failure.

Python RQ has priority based execution in-built. The priority is based on the queue and it does not support job level prioritization.

If you have 3 queues A, B and C namely as High — Default — Low, any job that is queued in Queue A will be of High priority.

Queues cannot have their own set of jobs with different priorities. A queue will represent the priority and not the job type, hence, jobs cannot be categorized based on its type. For example, if the job type of ‘Email’ can have different priorities based on the context. If we have to send billing information to our clients on their email, this is a high priority job but a ‘promotional email’ can be categorized as low priority. If we queue both the jobs in different queues, it becomes difficult to get analytical information for each job type.

 

Our Vision

A queue — worker system distinguishes itself in 5 key areas -

Scalability | Priority | Administrative Tool | Scheduling | Redundancy

We added our own set of improvements along with the key features -

  1. Job level prioritization — categorizing jobs in a standard set of queues where each queue can have jobs with different priorities unlike the implementation of priority queues in PythonRQ.
  2. Decoupling the queue-worker system with all of our backend applications. We aimed for a solution that can work as a standalone application and can be integrated anywhere with minimal configuration changes.
  3. Having transactional logs of each step in the life cycle of a job: queue — dequeue — execution — cleanup
  4. Configurable settings for retrying a job. A client can set a number of retries for a job.
  5. The system should be able to fetch the status of any job with the identifier. When the instances were terminated, any on-going task was interrupted and that had to be handled gracefully, was an overhead
  6. Realtime dashboard to keep track of pending jobs in the queues and also get the information about the connected workers.

Also, connecting to the system should be easy!

 

Implementation

To get started, we identified that PythonRQ would be a good base in the project to help us get a kick start instead of spending time and resources on creating a project from scratch. Forking the PythonRQ library was the first step.

 

Job level prioritization

The next challenge was to achieve job level priority. The goal was to have a single queue that has many jobs with different priority. PythonRQ uses Redis Lists to store the identifiers where LPOP is used to insert the job identifier and RPOP is used to pop one job at a time. Detailed meta of the job is stored in a hash that is again represented by the same identifier. There are 2 moving components in this structure, a Hash that represents the job information and a List that represents the queue itself. This queue operates on a simple FIFO flow and there is no way to identify high priority jobs in the list. Something that can sort the jobs while they are being inserted was much needed in the list so that the workers do not have a manual search for high priority jobs in the underlying queue. And, Redis SortedSets was the answer.

Redis SortedSets as explained on redis.io

Now, every queue was a sorted set. Each job had a score based on the priority — High, Medium, Low represented with their respective score. While dequeuing, instead of RPOP we will now use ZREVRANGE 0 1 that will return the first job from the reverse order which means the job with the highest score attached to it will always be returned first, irrespective of when it was added to the queue.

 

Scalability

Any of our web applications should use the service with minimal configuration changes. It should be as easy as Plug and Play.

We created a stateless endpoint that used the code structure of PythonRQ — RQ Dashboard and some of our own tweaks and then merged everything into one Flask based web application and named it Fusion. Fusion exposes a simple HTTP Post endpoint that accepts connections and enqueue jobs in Redis. We designed Fusion to be very a light-weight service so that it does not add up to our application latency. To communicate with the service, we created a Python Class has multiple APIs inbuilt for the web applications to use, this also standardized the communication to the service as every incoming call to the service would be from our own wrapper and hence, would not leave us with wrongly formatted data.

2020-06-08

P.S. We also explored Apache Kafka to relay the messages to Fusion instead of an HTTP call but this seemed like an overkill for us at scale which we were going to use, but there were provisions added to attach a Kafka relay layer later.
  • As a web application, now we had an option to attach this service to an auto-scaling group in AWS to ensure scalability.
  • We can also deploy multiple instances of the service that connects to the same storage, one for each VPC.

Logging

Keeping track of each step in the life cycle of a job became mandatory as most of our asynchronous traffic will be flowing through this service. Hence we need information to trace any anomalies. These logs are indexed and hence easily searchable. Log will be created in the following scenarios -

Example of the Logging Structure
  • Job Queued
  • Job Execution Started.
  • Job Successfully Completed.
  • Job Execution Failure / Retry Attempts.

Administrative Tools — RQ Dashboard

RQ Dashboard is a Flask-based web front-end for monitoring RQ queues. RQ Dashboard was designed to work with the structure of Python RQ and would not work out of the box for us as we changed the list based queue implementation with sorted sets. All we had to do was fork RQ Dashboard and make some minor adjustments and we were good to go. Dashboard had inbuilt support for —

  • Listing Active Workers.
  • Listing Queues and Pending Jobs in Each Queues.
  • Failed Jobs with the stack trace.
  • Option to re-queue a failed job.

Flow of Fusion

We do not plan to stop anytime soon!

1. Adding a Kafka Relay

Based on our current system throughput and the growth trajectory, we would reach a point where connecting directly to Redis will lead to a system bottleneck. Redis has little operational headroom, particularly with respect to memory. If we enqueued faster than we dequeued for a sustained period, we would run out of memory and be unable to dequeue jobs (because dequeuing also requires having enough memory to update job status and log).

Adding Kafka to the job queue will be a great improvement in terms of protecting our infrastructure from the exhaustion of Redis memory.

 

2. Contributing it to Open Source

We built Fusion by forking and merging multiple open source libraries and packages and we do plan to open source it.

References -

Good amount of inspiration was taken for the article from -

Scaling Slack’s Job Queue


Working on a project like this was a challenging ride! If any of this sounds interesting to you then come work with us — talent@livehealth.in