Kubernetes deployment of multi-stage pipeline tasks using Redis

Vasu Sharma
6 min readApr 16, 2022

If you are deploying a long running multi stage pipeline task in Kubernetes setup which should be sturdy enough to handle thousands of requests per second, with reasonable response time for each one of them, you have landed at the right place …. Now, let’s fly …

Recently, I was deploying a long running multi-stage task in Kubernetes environment. Got to learn a lot of things during deployment and scaling phase the application, felt like writing this article to help out anyone trying to achieve the same ….

Application Architecture

My application consisted of a 2 stage core pipeline, cannot disclose the exact functionality but mentioning in nutshell — stage 1 was responsible for preprocessing the input given to the pipeline and divide the input into logical chunks of data which are then used in stage 2 to produce actual results. It is worth mentioning that although, the stage 1 looks trivial for just logical division of inputs, it is an expensive machine learning model and important for the next stage which also uses an ML model along with other steps of results aggregation.

Processing a single request took around 30seconds — 1minute depending on the input. In order to cater to large number of requests coming from the users and that too in bursts, we introduced an another process called Worker process acting as a job scheduler for the cluster application. The worker process takes in the requests coming to the application, assigns them a unique job ID and adds them to a queue and immediately returns the Job Id back to the user. The user can poll the application for results which are stored in memory for certain time after computation. The worker process exposes APIs for taking in requests for processing and for giving status updates for the jobs present in the queue taken previously.

Did I mention queue just recently ? Here comes another pod for implementing that. For maintaining queue, storing results and message passing across processes running in different pods, we used Redis in memory database.So, we have 4 pods as of now —2 pods for stage1 and stage 2 and 1 for Redis and 1 for the Worker pod for coordination.

System Data Flow

Here comes the interesting step, how do the pods communicate with each other. I tried various approaches, mentioning 2 of them —

  1. Using Web APIs.

For the first approach, we used celery worker processes, backed by Redis database for queue/task management. The worker process would call the entry function in stage 1 of the pipeline using a WEB API exposed by FAST API based server. The API call was kept asynchronous i.e. the worker didn’t wait for the request to complete. The stage1 of pipeline on its job completion would push the results to the Redis database and intimate the worker process with a Redis notification. The main worker process upon getting the notification would fire the next stage of pipeline for further processing in the similar manner i.e. over a FAST API based web interface.

The stage 1 of pipeline could have directly called stage 2 on its completion but the 2 stages were kept non-synchronous for a purpose so that these 2 could scale independently. [ To be specific, the number of pods in stage 2 would have been directly proportional to number of pods of stage 1 in scaling stage ]

We had several problems with this approach. As the number of requests increase ( sent 1000s of requests within seconds) , the stage 1 pods started to horizontally scale out as per the Horizontal Auto Scaler setup in the cluster. The scaling of pods was quick enough but the post scale-out processing of spawning of applications, loading of models etc. within the containers took some additional time and hence the requests to the stage 1 pods, initiated by the worker processes started to drop out with connection errors. The same happened for stage 2 pods. Also, there were some other problems related to working of Redis notifications with celery workers.

2. Using Redis for all communications

I replaced every communication between the pods in the cluster with Redis Notifications. The data flow now looked like —

The worker process now on receiving the requests, assigns a job id and stores the metadata in the Redis database and then publishes a Redis notification to which stage 1 pods have subscribed to. One of the stage1 pods takes up the task from the Redis database, produce the results and then publish another type of Redis notification to which stage 2 pods are listening to. One of the stage 2 pods takes up the request and saves the final result in the Redis database after computation.

How does it help ? I found Redis notifications to be quite reliable. The time for which new pods are coming up, the notifications are kept at bay and delivered when the pods are ready, possibly reducing every chance of request failure as it was in case of previous Web API based communication principle. I tested with increasing requests on the cluster, the system came out to be quite reliable, none of the requests bouncing off due to connection issues.

But we had a problem with new approach, Redis notifications work like a broadcast operation, delivered to all the subscribers. This is a problem in case two or more stage 1 pods or stage 2 pods are ready to take up the requests and hence all of them would take up that particular job, leading to redundant processing.

We need locks ! Yes, we need some kind of distributed locking mechanism between different subscriber pods to process a particular notification just once in the cluster. And we again have a very nice Redis operation to the rescue — We use single threaded nature of Redis and SETNX operation.

I created a common lock variable(key) based on unique Job Id and set the lock using SETNX operation which sets the value only if the key is not present in the database. Since Redis is single threaded, so even if multiple pods go on to set the lock key, only one of them succeeds and others fail out. The pod which gets the lock takes up the job and other pods go on to listen for other notifications.

Outro

Although, there can be other better/simpler ways to implementing this like having a reliable global queue across the cluster to manage the tasks etc and we are in the process of exploring those options but this implementation with Redis notification based communication also scales well and none of the requests error out or get missed in any case. In addition to all this, now Redis database becomes bottle neck for all processing and we perhaps need to replicate Redis database pods and find some mechanism to keep them in sync then, all these optimizations are to be worked and tested on further.

PS: Excuse me for not adding any reference pictures or code snippets.

--

--