Celery with heavy workloads Deep Dive in Solution

Introduction

This is about my experience with Celery a distributed framework in Python in heavy workload environment.
 I am dividing the blog into the different sections:
  • Product Brief
  • Current Architecture
  • Problem With Data loads and Sleepless Nights and Solutions Tried
  • The Final Destination

Product Brief

We are working on a product that aims at fetching data from the multiple sources and aggregating that data to generate insights from that data.
Some of the Data Sources that we support as of now are:
  • Stock News from multiple sources
  • RSS feed 
  • Twitter
  • Reddit
  • Yahoo News
  • Yahoo Finance
  • Earning Calenders
  • Report Filings
  • Company Financials
  • Stock Prices

Current Architecture

Broad View

We knew that problem that we are solving has to deal with the cruel decentralized Internet. And we need to divide the large task of getting the data from the web and analyzing it into small tasks.


Fig 1

On exploring different projects and technologies and analyzing the community support we came to a decision to use Python as our language of choice and Celery as our commander. 

Python is a pretty vast language backed by a large set of libraries. Since inception, It has gained a lot of popularity among the developer and Data Science communities. One of the major reason to use python as a backend is the project Celery. 

Its website defines celery as An asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. To more about celery you can visit its website here.

By now we were clear of how we want to proceed. We wanted to divide the process in Fig 1 into granular units (in celery terminology task). Keeping this as the baseline we identified all the individual units which can work independently in the system.
This gave rise to a new look to Fig 1


Fig 2

We had following components in our Celery cluster


TaskParallelismMachinesAutoSclae
Pollers21Upto 10 processes
Analyzers22Upto 4 processes
Store22Upto 10 processes
Harvester11-
Sentiment Aggregators11-

Were are using MongoDB in a replication cluster with a factor of 3 as a Database engine and Redis DB for queuing and communication between independent celery workers.

This cluster was deployed on 5 physical machines  Intel i7 processors and 16 Gigs memory each.

The following figure describes the communication of celery works with the help of broker(Redis)


 Problem With Data loads and Sleepless Nights

We were working in the development stage of the product so the amount of task that we were processing were low. We never stress tested our system for the unexpected loads.

The following table contains the approximate list of tasks that we were processing in a day



SourceRun Freq in hourPoll Tasks per runPer hour tasksPer Day Tasks
Twitter Search1212000100024000
Google News1213000108326000
Yahoo News12500041710000
Yahoo Finance SI2450002085000
Yahoo Finance FE2450002085000
Twitter StreamStream Process
Total Poll Tasks70000
Total Analyze Task280000
Total Store Task2800000
Total Tasks3150000

As the development reached a stable stage, we planned to increase the data sources and search terms for all the sources to scale the system.

SourceFreq in hourPoll Tasks per runPer hour tasksPer Day Tasks
Twitter Search418,5004,625111,000
Google News422,5005,625135,000
Yahoo News611,2001,86744,800
Yahoo Finance SI2411,20046711,200
Yahoo Finance FE2411,20046711,200
Twitter StreamStream Process
Generic RSS5205461,093
ZeroHedge51124
Street Eye view51124
EOD historical prices2410,00041710,000
Reddit68,3501,39233,400
Total Poll Tasks313,200
Total Analyze Tasks939600
Total Store Tasks12528000
Total Tasks13,780,800

With this new configurations, our system was generating around 10 million tasks in a day. This is nearly 4 times the usual load.
When we ran our system with this workload. Our approach to scaling was to add more node in the existing cluster to get more processing power. We identified the processes that need to be scaled with this load and added more nodes as follows:


TaskParallelismMachinesAutoSclae
Pollers51Upto 10 processes
Analyzers22Upto 4 processes
Store62Upto 10 processes
Harvester11-
Sentiment Aggregators31-
Initially, the system ran well and the data was flowing seamlessly but after an hour of processing this load, we noticed the slowness in the task processing. The rate of task processing at all the nodes was decreased to nearly half. What interesting thing we noticed was the nodes were underperforming.
They were not working to their full potential. Just to give an example if an analyzer could perform 100 tasks in 5seconds it was performing around 40-45 tasks per 5 seconds.
This was a weird behavior as instead of working in loads and performing more tasks nodes were not even working at their usual speed.
The first approach we followed(typical engineer's approach to scaling)  was to add more nodes. But all of our systems were utilizing all of the resources and we didn't have the scope of adding more nodes.

We had to find out a way to scale without adding more nodes. We studied many resources to get the answer for this. Some of the most helpful resources were:
By now we knew that we can increase the throughput of the I/O bound tasks by using gevents which is a library to provide async processing using a thread like mechanism.  But before implementing this solution we need to categorize our processes into two categories
  1. I/O bound tasks(can be used with gevents)
  2. CPU intensive tasks(should not be used with gevents)
We used the gevent mechanism to increase the throughput of I/O bound process which in our case were 70% processes. 
We used a concurrency pool of 500 thread at each node and tested the system with these changes.

Now we were processing around 10x tasks parallelly but the problem was not solved yet. After a few hours of run, the problem came again. The rate of task processing slowed down.

We ran a quick investigation of all the nodes and system utilization. We discovered a mind-boggling fact. One of the systems running RedisDB and MongoDB was consuming 16Gigs of RAM and 8 Gigs of RAM usage. This was the first time we were observing this kind of memory usage by any of our systems.
We somehow managed to run the top command on the system and found that RedisDB alone was consuming around 60% memory and 50% Swap :o "that is too much memory".

We again started our exploration around this issue and our first step was to check if other developers have also faced such kind of issues.
Some of the helpful links were:
We got some idea that Redis is storing the result of the tasks and that was consuming the memory.
These tasks and task results were building up in Redis queues which were slowing down the Redis broker as it was now storing a large number of tasks and results, this was, in turn, making the whole system run out of memory and gradually to a halt.

Our beautifully designed ship was sinking and we had to do something to save it.

Basically, our system was stuck in an infinite loop. 
  • Redis sends tasks at high speed but consumers cannot consume at that rate.
  • Redis slows down and lowers the rate at which is sending tasks and now consumers stay idle most of the time.
In both the situations, the common name that was in doubt was the Redis DB.
On researching and exploring the RedisDB we found that it is a single-threaded system and performs all the tasks in a round robin fashion. Also, it was saving the tasks received by the systems in memory which was increasing the memory consumption and the under heavy loads that single thread was busy performing the persistence and other tasks that it slows down the polling process initiated by the consumers. So, we found the reasons for both the problems that we were facing.

To handle this heavy workload one of the choices was to shard the Redis server into a cluster. So we created a Redis partition set (tutorials can be seen here). We created a cluster of three nodes with each node handling an equal number of keys through a consistent hashing technique

We plugged this cluster with the celery app and the workers were throwing an exception "MOVED to server 192.168.12.12"

On further exploring the issue we found that Redis cluster is not yet supported by celery. On one hand, we thought we had a solution but on the other, that was not yet supported by the underlying framework :(

Now we starting exploring again to get a solution to our problem and we thought of using a proxy server in front of Redis cluster Twemproxy. But this time we first choose to check the compatibility with our framework and boom...... we cannot be more wiser in taking this path.
Proxy was not yet supported by Celery.

Now frustrated with this incompatibility issue we tried to figure out what all things are compatible with the framework. Some of the compatible brokers were
  • SQS
  • Redis
  • RabbitMQ
  • Zookeeper
A straightaway thought was to try a different broker so, we began to explore these options. The following table proved useful in narrowing down our research

NameStatusMonitoringRemote Control
RabbitMQStableYesYes
RedisStableYesYes
Amazon SQSStableNoNo
ZookeeperExperimentalNoNo

Redis is what we have already tried so we went for the second option that is stable and provides more features i.e RabbitMQ.

Spoiler:
By now we knew that RabbitMQ is one the best choice for the brokers and is used by wide variety of clients in production and Redis is the best choice in terms of result backend (intermediate results that are stored by a task in Celery chains and chords).

We did necessary changes to the system and tried to run the system.

This time system nearly went into an irrecoverable state consuming all the memory and we had to restart the servers.
While all this was happening we were monitoring the RabbitMQ through its admin tool and found a fishy thing. It was creating a large number of queues nearly as many as the number of tasks.
On exploring this issue we found that the queues were created to store the result of the intermediate tasks and this was consuming too much space in the memory as all this storage was in memory. (Reference link)
Following is the list of things we had tried till now:

  • Gevents
  • Redis Cluster
  • Redis Partitioning
  • Twenproxy
  • RabbitMQ
Looking into the state of the system  and thinking about all the aspects of the products we had studied till now. Something clicked in my mind "Why not separate the polling mechanism with the result backend" .We had tried so many things so this was worth giving a try.

The Final Destination

Collecting all the clues from our exploration we decided to use both the systems i.e Redis and RabbitMQ for the work that they are best at.

We deployed Redis as a result backend to store the intermediate results and RabbitMQ as a broker for maintaining communication and passing tasks (remember the spoiler above).



With this architecture, we were able to run the system under the workload on over 10 million tasks a day which can be scaled easily.

This was an exponential learning opportunity for us in understanding the working of different types of systems. Hope this helps someone going through the same problem.

Acknowledgments

We took hints from the Celery documentation, Redis documentation, Stackoverflow threads, Github Issue page of Celery, Case studies by Instagram and Trivago.




Comments

Post a Comment

Popular posts from this blog

Word Vectorization

Spidering the web with Python

Machine Learning -Solution or Problem