This post is based on more than one project I’ve been involved in but it was triggered by a recent project where I helped build an IoT system that is capable of processing and storing tens of thousands of messages a second. The system was built on top of Azure Event Hubs, Azure Web Jobs, Azure Web Sites, HDInsight implementation of HBase, Blob Storage and Azure Queues.
As the title suggests it is a rather random set of observations but hopefully they all together form some kind whole :).
Keep development environment isolated, even in the Cloud
A fully isolated development environment is a must have. Without it developers step on each others toes which leads to unnecessary interruptions and we all know that nothing kills productivity faster than context switching. When you build an application that uses SQL Server as a backend you would not think about creating one database that is shared by all developers because it’s relatively simple to have an instance of SQL Server running locally. When it comes to services that live only in the Cloud this is not that simple and often the first obvious solution seems to be to share them within the team. This is the preferred option especially in places that are not big on automation.
Another solution to this problem is to mock Cloud services. I strongly recommend against it as it will hide a lot of very important details and debugging them in CI (or UAT or Production) is not an efficient way of working.
If sharing is bad then then each developer could get a separate instance of each Cloud service. This is great in theory but it can get expensive very quickly. E.g. the smallest HBase cluster on Azure consists of 6 VMs.
The sweet spot seems to be a single instance of a given Cloud service with isolation applied at the next logical level.
- HBase – table(s) per developer
- Service Bus - event hub per developer
- Azure Search – index(es) per developer
- Blob Storage – container per developer
- Azure Queues – queue(s) per developer
Each configuration setting that is developer specific can be encapsulated in a class so the right value is supplied to the application automatically based on some characteristic of the environment in which the application is running, e.g. machine name.
Everything can fail
The number of moving parts increases with scale and so they are chances of one or more of them failing. This means that every logical component of the system needs to be able to recover automatically from an unexpected crash of one or more of its physical components. Most of PaaS come with this feature out of the box. In our case the only part we had to take care of was the processing component. This was relatively easy as Azure lets you continuously run N instances of a given console app (Web Job) and restarts them when they crash or redeploys them when the underlying hardware is not healthy.
When the system recovers from a failure it should start where it stopped which means that it possibly needs to retry the last operation. In a distributed system transactions are not available. Because of that if an operation modifies N resources then all of them should be idempotent so retries don’t corrupt the state of the system. If there is a resource that doesn’t provide such a guarantee then it needs to be modified as the last one. This is not a bullet proof solution but it works most of the time.
- HBase – writes are done as upserts
- Event Hub - checkpointing is idempotent
- Blob Storage – writes can be done as upserts
- Azure Queues – sending messages is NOT idempotent so the de-duplication needs to be done on the client side
Sometimes failure is not immediate and manifests itself in a form of an operation running much longer than it should. This is why it is important to have relatively short timeouts for all IO related operations and have a strategy that deals with them when the occur. E.g. if latency is important then a valid strategy might be to drop/log data currently being processed and continue. Another option is to retry the failed operation. We found that the default timeouts in the client libraries where longer than what were comfortable with. E.g. default timeout in HBase client is 100 seconds.
Don't lose sight of the big picture
It is crucial to have access to an aggregated view of logs from the whole system. I can’t imagine having tail running for multiple components and doing the merge in my head. Tools like Seq are worth every cent you pay for them. Seq is so easy to use that we used it also for local development. Once the logging is setup I recommend spending 15 minutes a day watching logs. Well instrumented system tells a story and when the story does not make sense you know you have a problem. We have found several important but hard to track bugs just by watching logs.
And log errors, always log errors. A new component should not leave your dev machine unless its error logging is configured.
Ability to scale is a feature
Every application has performance requirements but they are rarely explicitly stated and then the performance problems are discovered and fixed when the application is already in Production. As bad as it sounds in most cases this is not a big deal and the application needs only a few tweaks or we can simply throw more hardware at it.
This is not the case when the application needs to deal with tens of thousands of messages a second. Here performance is a feature and needs to be constantly checked. Ideally a proper load test would be run after each commit, the same way we run CI builds, but from experience once a day is enough to quickly find regressions and limit the scope of changes that caused them.
Predictable load testing is hard
Most CI builds run a set of tests that check correctness of the system and it doesn’t matter whether the tests run for 0.5 or 5 or 50 seconds as long as all assertions pass. This means that the performance of the underlying hardware doesn’t affect the outcome of the build. This is not true when it comes to load testing where slower than usually hardware can lead to false negatives which translate to wasted time spent on investigating problems that don’t exist.
In ideal world the test would run on isolated hardware but this is not really possible in the Cloud which is a shared environment. Azure is not an exception here. What we have noticed is that using high spec VMs and running load test at the same time of the day in the morning helped keep the performance of the environment consistent. This is a guess but it looks like the bigger the VM the higher the chance for that VM to be the only VM on its host. Even with all those tweaks in place test runs with no code changes would differ by around 10%. Having load testing setup from the beginning of the project helps spot outliers and reduce the amount of wasted time.
Generating enough of correct load is hard
We started initially with Visual Studio Load Test but we didn’t find a way to fully control the data it uses to generate load. All we could do was to generate all WebRequests up front which is a bit of problem at this scale. We couldn’t re-use requests as each of them had to contain different data.
JMeter doesn’t suffer from this problem and was able to generate enough load from just one large VM. The templating language is a bit unusual but it is pretty powerful and at the end of the day this is what matters the most.
JMeter listeners can slow down significantly the load generator. After some experimentation we settled on Summary Report and Save Responses to a file (only failures) listeners. They gave us enough information and had very small impact on the overall performance of JMeter.
The default batch file limits JMeter heap size to 512MB which is not a lot. We simply removed the limit and let the JVM to pick one for us. We used 64 bit JVM which was more than happy to consume 4GB of memory.
Don’t trust average values
Surprising number of tools built for load testing shows average as the main metric. Unfortunately average hides outliers and it is a bit like looking at 3D world through 2D glasses. Percentiles is a much better approach as they show the whole spectrum of results and help make an informed decision whether it’s worth investing more in performance optimization. This is important because performance work never finishes and it can consume infinite amount of resources.
Feedback cycle on performance work is looooooooooooong
It is good to start load testing early as in most cases it is impossible to make a change and re-run the whole load test locally. This means each change will take quite a bit of time to be verified (in our case it was 45 minutes) . In such a case it is tempting to test multiple changes at once but it is a very slippery slope. In a complex system it is hard to predict the impact a given change will have on each component so making assumptions is very dangerous. As an example, I sped up a component responsible for processing data which in turn put more load on the component responsible for storing data which in turn made the whole system slower. And by change I don’t always mean code change, in our case quite a few of them were changes to the configuration of Event Hub and HBase.
Testing performance is not cheap
Load testing might require a lot resources for relatively short period of time so generally it is a good idea to automatically create them before the test run and then automatically dispose them once the test run is finished. In our case we needed the resources for around 2h a day. This can be done with Azure though having to use three different APIs to create all required resources was not fun. I hope more and more services will be exposed via Resource Manager API.
On top of that it takes a lot of time to setup everything in a fully automated fashion but if the system needs to handle significant load to be considered successful then performance related work needs to be part of the regular day-to-day development cycle.
Automate ALL THE THINGS
I’m adding this paragraph for completeness but I hope that by now it is obvious that automation is a must when building a complex system. We have used Team City, Octopus Deploy and a thousand or so lines of custom PowerShell code. This setup worked great.
Event Hubs can scale
Event Hubs are partitioned and the throughput of each partition is limited to 1k messages a second. This means that Event Hub shouldn’t be perceived as a single fat pipe that can accept any load that gets thrown at it. For example, an Event Hub that can in theory handle 50k messages a second might start throwing exceptions when the load reaches 1k messages a second because all messages happen to be sent to the same partition. That’s why it’s very important to have as even distribution of data that is used to compute PartitionKey as possible. To achieve that try a few hashing algorithms. We used SHA1 and it worked great.
Each Event Hub can have up to 20 independent consumer groups. Each consumer group represents a set of pointers to the message stream in the Event Hub. There is one pointer per partition. This means that there can be only one Event Hub reader per partition per consumer group.
Let’s consider an Event Hub with 50 partitions. If the readers need to do some data processing then a single VM might not be enough to handle the load and the processing needs to distributed. When this happens the cluster of VMs needs to figure out which VM will run readers for which partitions and what happens when one of the readers or/and VMs disappears. Reaching consensus in a distributed system is a hard problem to solve. Fortunately, Microsoft provided us with Microsoft.Azure.ServiceBus.EventProcessorHost package which takes care of this problem. It can take couple of minutes for the system to reach consensus but other than that it just works.
HBase can scale
HBase is a distributed data base where data is stored in lexicographical order. Each node in the cluster stores an ordered subset of the overall data. This means that data is partitioned and HBase can suffer from the same problem as the one described earlier in the context of Event Hubs. HBase partitions data using row key that is supplied by the application. As long as row keys stored at more or less the same time don’t share the same prefix then the data should not be sent to the same node and there should be no hots spots in the system.
To achieve that the prefix can be based on a hash of the whole key or part of it. The successful strategy needs to take into account the way the data will be read from the cluster. For example, if the key has a form of PREFIX_TIMESTAMP_SOMEOTHERDATA and in most cases the data needs to be read based on a range of dates then the prefix values needs to belong to a predicable and ideally small set of values (e.g. 00 to 15). The query API takes start and end key as input so to read all values between DATE1 and DATE2 we need to send a number of queries to HBase which equals the number of values the prefix can have. In the sample above that would be 16 queries.
HBase exposes multiple APIs. The one that is the easiest to use is REST API called Stargate. This API works well but its implementation on Azure is far from being perfect. Descriptions of most of the problems can be found on the GitHub page of the SDK project. Fortunately, Java API works very well and it is actually the API that is recommended by the documentation when the highest possible level of performance is required. From the tests we have done Java API is fast, reliable and offers predictable latency. The only drawback is that it is not exposed externally so HBase and its clients need to be part of the same VNet.
BTW Java 8 with lambdas and streams is not that bad :) even though the lack of var keyword and checked exceptions are still painful.
Batch size matters a lot
Sending, reading and processing data in batches can significantly improve overall throughput of the system. The important thing is to find the right size of the batch. If the size is too small then the gains are not big and if the size too big then then a change like that can actually slow down the whole system. It’s taken us some time to find the sweat spot in our case.
- Single Event Hub message has a size limit of 256 KB which might be enough to squeeze multiple application messages in it. On top of that Event Hub accepts multiple messages in a single call so another level of batching is possible. Batching can also be specified on the read side of the things using EventHostProcessor configuration.
- HBase accepts multiple rows at a time and its read API can return multiple rows at a time. Be careful if you use its REST API for reading as it will return all available data at once unless the batch parameter is specified. Another behaviour that took as by surprise is that batch represents the number of cells (not rows) to return. This means a single batch can contain incomplete data. More details about the problem and possible solution can be found here.
Web Jobs can scale
Azure makes it very easy to scale up or down the number of running instances. The application code doesn’t have to be aware of this process but it might if it is beneficial to it. Azure notifies jobs before it shuts them down by placing a file in their working folder. Azure Web Jobs SDK comes with useful code that does the heavy lifting and all our custom code needed to do is to respond to a notification. We found this very useful because Event Hub readers lease partitions for a specific amount of time so if they are not shutdown gracefully the process of reaching distributed consensus takes significantly more time.
Blob Storage can scale
We used Blob Storage as a very simple document database. It was fast, reliable and easy to use.