One of the tasks of our Optimize team is to build a real-time analytics engine for our A/B testing framework which involves analyzing the consumers experiencing each of the variants on each experiment. Along with this, we need to look at each deal sold in the system and properly attribute each sale to the experiments the consumer visited on their way to that sale that might have influenced their buying decision. Based on this visiting and buying data, the different product teams can then determine which of the experiment variants they want to keep.
In order to improve any consumer-facing Groupon product, experiments are done where a random sample of consumers will be placed into a testing group and shown one or more variants of the original, control, experience, and then their responses will be tallied. This A/B testing data will come to our cluster in the form of several separate messages. Some will indicate the consumer, browser, and device when an experiment variant is encountered, others will indicate when a consumer purchased a deal. It is then the job of this cluster to correlate the actions taken by that consumer to see if the variant is better than the control. Did the larger image lead to more purchases? Did the location of the button cause more people to click on it? Read on to learn more about our approach and results.
All these experiments need to be classified and the consumer actions attributed. The basic systems diagram looks like this:
The web and mobile clients send their data via their own formats into a kafka cluster that is read by the Unified-Click Stream which generates a curated, unified formatted stream of messages that is then the input to the Finch Analytics where the attribution is done.
Recently, several production systems started using Clojure and given that Storm is written primarily in Clojure, it seemed like a very good fit to the problem of real-time processing of messages. There are several topologies in our cluster – one that unifies the format of the incoming data, another enriches it with quasi-static data, and then a simple topology that counts these events based on the contents of the messages. Currently, we’re processing more than 50,000 messages a second, but with Storm we have the ability to easily scale that up as the load increases. What proved to be a challenge was maintaining the shared state as it could not be stored in any one of the bolts as there are 30 instances of it spread out across five machines in the cluster. So we had to have an external shared state.
All of our boxes are located in our datacenter, and because we’re processing real-time data streams, we’re running on bare metal boxes – not VMs. Our tests showed that if we used the traditional Redis persistence option of the time/update limits, a Redis box in our datacenter with 24 cores and 96 GB of RAM was more than capable of handling the load we had from these 30 bolts. In fact, the CPU usage was consistently below 20% of one of the 24 cores, and all other system parameters equally low. Plenty of headroom to do a little more work on the redis server and still keep ahead of the flow.
Redis is primarily a key/value store, with the addition of primitive data types including HASH, LIST, and SET to allow a slightly nested structure and operations to the cache. And while it’s ability to recover after a crash with it’s data intact is a valuable step up over Memcached, it really makes you think about how to store data in a useful and efficient layout. The initial structure we chose for Redis was pretty simple. We needed to have a Redis SET of all the experiment names that were active. It turns out that there can be many experiments in the codebase, but only some are active. Others may have completed and just haven’t been removed from the code. To support this active list, we had a single key:
finch|all-experiments => SET (names)
and then for each active experiment name, we had a series of counts: How many consumer interactions have there been with this experiment? How many errors were there on the page when dealing with an experiment? and even a count for the basic errors encountered in the stream itself – each updated with Redis’ atomic INCR function:
finch|<expr-name>|counts|experiment => INT finch|<expr-name>|counts|errors => INT finch|<expr-name>|counts|null-b-cookies => INT
The next step was to keep track of all the experiments seen by all the consumers. As mentioned previously, this includes the browser they were using (Chrome 29.0, IE 9.0, etc.), the channel (a.k.a. line of business) the deal is from (Goods, Getaways, etc.), and the name of the variant they experienced. The consumer is represented by their browser ID:
finch|<expr-name>|tuples => SET of [<browser>|<channel>|<name_of_variant>] finch|<expr-name>|variant|<browser>|<channel>|<name_of_variant> => SET of browserId
The Redis SET of tuples containing the browser name and version, the channel, and the name of the variant they saw was important so that we didn’t have to scan the key set looking for the SETs of browser IDs. This is very important as Redis is efficient at selecting a value from the key/value set – but it is horribly inefficient if it has to scan all the keys. While this function exists in the Redis command set, it’s also very clearly indicated as not to be used in a production system because of the performance implications.
Finally, we needed to attribute the sales and who bought them, again based on these tuples:
finch|<expr-name>|orders|<browser>|<channel>|<name_of_variant>|orders => INT finch|<expr-name>|orders|<browser>|<channel>|<name_of_variant>|qty => INT finch|<expr-name>|orders|<browser>|<channel>|<name_of_variant>|revenue => FLOAT finch|<expr-name>|orders|<browser>|<channel>|<name_of_variant>|consumers => SET of uuid
As you can see, the lack of general, multi-level, nested structures in Redis means a lot needs to be accomplished by how you name your keys, which makes this all appear to be far more complicated than it really is. And at the same time, we have purposefully chosen to use the atomic Redis operations for incrementing values to keep the performance up. Consequently, this may seem like a lot of data to hold in Redis, but it lead to very fast access to the shared state and Redis’ atomic operations meant that we could have all 30 instances of the bolt hitting the same Redis instance and updating the data concurrently. Performance was high, the analytics derived from this data were able to be generated in roughly 5 sec, so the solution seemed to be working perfectly.
Until we had been collecting data for a few days.
The memory usage on our Redis machine seemed to be constantly climbing.
First it passed 20 GB, then 40 GB, and then it crashed the 96 GB machine. The problem stemmed from the fact that while an experiment was active we were be accumulating data for it. While the integers weren’t the problem, this one particular SETs was:
finch|<expr-name>|variant|<browser>|<channel>|<name_of_variant> => SET of browserId
There would, over time, be millions of unique visitors, and with more than a hundred active experiments at any one time, and even multiple browserIDs per consumer. Add it all up, and the Redis SET would have hundreds of millions of entries. This would continue to grow as more visitors came to the site and experience the experiments. What we needed was a much more efficient way to store this data.
Wondering what Redis users do when wanting to optimize storage we did some research and found this blog post by the Engineering group at Instagram. We also found this post on the Redis site that reinforces this point and gives tuning parameters for storing efficiently in a HASH. Armed with this knowledge, we set about refactoring our data structures to see what gains we could get.
Our first change was to pull the ‘counts’ into a HASH. Rather than using the following:
INCR finch|<expr-name>|counts|experiment INCR finch|<expr-name>|counts|errors INCR finch|<expr-name>|counts|null-b-cookies
we switched to this:
HINCR finch|<expr-name>|counts experiment HINCR finch|<expr-name>|counts errors HINCR finch|<expr-name>|counts null-b-cookies
Clearly, we were not the first to go this route as Redis had the equivalent atomic increment commands for HASH entries. It was a very simple task of breaking up the original key and adding the ‘H’ to the command.
Placing the sales in a HASH (except the SET of consumerIDs as they can’t fit within a HASH), was also just a simple breaking up of the key and using HINCR and HINCRBY. Continuing along these lines we saw we could do a similar refactor and we switched from a SET of browserIDs to a HASH where the keys are the browserIDs – just as unique, and we can use the Redis command HKEYS to get the complete list. Going further, we realized the values of the new HASH could contain some of the data that was in other structures:
finch|<browserID> => app-chan => <browser>|<channel> finch|<browserID> => trips|<expr-name>|<name_of_variant> => 0
where that zero was just a dummy value for the HASH key.
With this new structure, we can count the unique browserIDs in an experiment by using the Redis EXIST function to see if we have seen this browserID in the form of the above HASH, and if not, then we can increment the number of unique entries as:
finch|<expr-name>|tuples => <browser>|<channel>|<name_of_variant> => INT
At the same time we get control over the ever-growing set of browserIDs that was filling up Redis in the first place by not keeping the full history of browserIDs, just the count. We realized we could have the browserID expire on a time period and let it get added back in as consumers return to use Groupon. Therefore, we can use the Redis EXPIRE function on the:
HASH, and then after some pre-defined period of inactivity, the browserID data would just disappear from Redis. This last set of changes – moving away from a SET to a HASH, counting the visits as opposed to counting the members of a SET, and then EXPIRE-ing the data after a time really made the most significant changes to the storage requirements.
So what have we really done? We had a workable solution to our shared state problem using Redis, but the space required was very large and the cost of keeping it working was going to be a lot more hardware. So we researched a bit, read a bit, and learned about the internals of Redis storage. We then did a significant data refactoring of the information in Redis – careful to keep every feature we needed, and whenever possible, reduce the data retained.
The end effect? The Redis CPU usage doubled, which was still very reasonable – about 33% of one core. The Redis storage dropped to 9 GB – less than 1/10th of the original storage. The latency in loading a complete experiment data set rose slightly – about 10% on average, based on the size and duration of the experiment. Everything we liked about Redis: fast, simple, robust, and persistent, we were able to keep. Our new-found understanding of the cost – both in CPU time and memory space, of different Redis data structures has enabled us to make it far more efficient. As with any tool, the more you know about it – including its internal workings, the more you will be able to do with it.