Building a simple APM server | handling massive data writes and complex queries
Pretty excited to share my experience building an APM server! it has been a great journey so far with multiple logos as you can see xd.
So what’s an APM anyway? APM is short for application performance monitor, which means it’s a designated server that all what it does in life is monitor all your services and collects detailed stats about their performance, which could include:
- Service level or endpoint throughput (how many requests it did handle successfully(2XX response) per minute, probably it’s a chart with time on its x-axis and number of 2XX requests on the y-axis.
- Endpoint average response time, max, min, and all the stats family here, response time is the time it takes an endpoint to respond to the client starting from the moment he sent the request.
- Endpoint runtime breakdown, like how many requests has been made to 3rd parties? how long did it take? how about DB transactions? etc
You can checkout new relic apm for a real example for such service if you never used it before.
Today we are going to build our own apm server for our own services, an in-house solution for collecting stats about our services’ performance, we will go through the challenges making such service including the amount of data writes that need to be handled and the near real-time complex analytics that we should be serving to our clients in their dashboard or whatever.
System Specs, what to expect
As always we start with specifying the set of features we are going to implement here, let’s say we only want to collect stats like:
- Throughput on a service/endpoint level, we expect to be able to request and get a train of data that could be used in a chart to show the throughput of a service or endpoint over time.
- Average response time an endpoint takes, max/min and 90–80-… percentiles. For example we could get a row of data like “GET https://myservice.com/some_route | average_response_time 100ms | max 400ms | min 50ms ”
- We expect the integration of such service to be seamless in our services and not require much code/change if any.
So what’s challenging here
The feature list seems a bit small and may convey a wrong message of “duh, that seems like an easy task!”, but let’s look at this thing at a glance.
First how will this work? how will we collect those stats? well we could say we will have an sdk installed in every service that sets a hook/event to trigger whenever a request is responded to from that service, and calculates the time from request to response somehow, when done it sends that request record to our APM server as a POST request, the request may look like this for example:
curl --location --request POST 'http://localhost:5000/requests' \
--header 'Authorization: Bearer jwt_token_placeholder' \
--header 'Content-Type: application/json' \
--data-raw '{
"url": "https://myservice.com/some_route",
"http_method": "GET",
"response_time": 3000,
"service_name": "service_name",
"status_code": 200,
"created_at": "2020-04-02 02:10:01"
}'
And that’s it, every time a request is responded to in one of our services, we will receive a request like this one and that makes sure we have all the data necessary to do all kinds of analytics.
So what’s the problem really? well if we look closely here, we can notice that the load on our APM server is the summation of all loads on all services at anytime! that’s a lot to handle for sure and it’s all DB/Disk writes also, we need to persist every request data, how can we handle all those requests/DB transactions?
Also on the other side, assuming we omitted the first problem, we still have a problem with querying, imagine handling a request like throughput; we need to group and count the number of requests done in every minute in our database per service per endpoint, that’s pretty exhaustive considering the small groups being built up here.
Breaking down the system step by step
Let’s start with our very first problem here, that is handling massive amount of traffic, we need to build our system in a way that makes it easily scale-able with just some numbers changing and pushing buttons.
One thing we know for sure: we cannot handle those requests synchronously, we need to control how many requests are being executed at any moment so we don’t choke the machine running our APM server to death, that definitely leads us to job scheduling. Instead of handling a DB write Directly we will:
- Take the data, extract it from request.
- push the data in all job Queues interested in such data.
- return 200 ok to the requester.
At the consumer/worker side, we could do some batch handling for the jobs as they are kind of small and frequent, so instead of inserting one record at a time, we will wait till X records are available and insert them all in one batch at a time in our Database.
So now we know we are not sync but we could be given enough resources, that is if we want to be nearly sync, we could add more workers so the queue will almost be always empty, taking about scaling here.
But it’s important that we know how much is being executed now, because let’s say we have 10 workers and a batch size of 20, we know that at anytime there is a maximum of 20 * 10 DB writes being made, and surely we should optimize those parameters to better use our resources to the max.
Eventual Consistency to the rescue
Even after our first optimization, we still have problems storing our data in a SQL engine like mysql for example, which offers consistency over anything else, that means our DB writes will be stacked up in that one enormous MYSQL instance which we will keep scaling vertically -we will act like vitess does not exist-.
Since we are so much interested into fast writes and given that most our queries are stats over weeks/months/years, we could sacrifice consistency for an eventual one given that the effect of not including a new request record in a monthly report for example isn’t that big of a deal anyway.
One of the most known DB engines that supports high speed DB writes is Cassandra, cassandra offers out of the box horizontal scaling, which means one worker can connect to multiple DB nodes and cassandra would handle doing our DB writes concurrently.
DB Writes are considered done after a number of confirmations(syncing) between a number of cassandra nodes, that number is configurable and you can specify it to be very low if you don’t care at all about consistency.
Sounds cool and perfect for our use-case, that means we could scale our system from 3 points now:
- Number of workers operating on the queue.
- Batch size.
- Number of Cassandra DB Nodes available in the cluster and Number of Nodes Acks needed per write.
Designing DB Schema
There is not much to do here since we have one table holding every request info including URL, service name and timestamp, but cassandra offers us a different experience here in indexes and keys.
In Cassandra we have two types of keys to specify for our primary key:
- partition key: used to store our data in a partition like style, the field used for this key should have limited set of values and each value should hold enough records to form a good healthy partition.
- cluster key: used to index/sort our data inside the partition
For our case, the service name could work pretty well as a partition key, we are sure that under each service name there will be enough records from its endpoints plus it’s a logical access point in our queries, we will always say get me that endpoint inside this service.
For the clustering key, we will use a composite key of (timestamp, http_method, url), notice the ordering here is important because it specifies the order we can optimally access our database, we will surely grab some stats about our endpoints/service in some limited date range so it’s logical to put timestamp first. Secondly the url and the method are interchangeable but we will surely need to be specific both for each endpoint we want (GET campaigns or POST campaigns for example).
create table request_info (service_name text, url text, method text, status smallint, response_time int, created_at timestamp, primary key (service_name, created_at, method, url));
Leveling up in Queries/Aggregations with Elastic Search
We have done a fair job till now maintaining a good and scale-able system for handling write requests and not a bad one for reads but remember Cassandra does not really have that edge in reads anyway, and our queries are mostly summations, groups and many more stats may unveil in the future and need to be handled, we need to have:
- Cassandra as our backbone Data store where we keep all of our data stored and written efficiently/safely.
- Need a new player in the game that could handle different queries and Aggregations so swiftly, but we may sacrifice the data safety here a bit since we have cassandra to back us up in cases of data loss, but in return we want it to be optimized for reads and flexible for different queries.
Here we introduce ourselves to the magic of Elastic Search, elastic search is a distributed search engine with many features empowering you with a high speed, near real time search index for your data.
What we could do here is have two separate kinds of workers:
- One kind is the cassandra worker that handles any new requests, batches them and writes to cassandra
- The other worker will be the ES sync worker, that also batches any new requests and syncs them to the elastic index, then we will have the elastic index in the front-line serving as primary search tool for our data ready to serve any analytics for our clients.
Let’s start by defining our properties/fields to elastic search, ES is intelligent enough to do this for us and auto discover the types but we may know better already about our data than letting him waste the day on it
{
"properties": {
"created_at": {
"type": "date",
"format": "yyyy-MM-dd hh:mm:ss"
},
"service_name": {
"type": "keyword"
},
"url": {
"type": "text"
},
"status_code": {
"type": "short"
},
"response_time": {
"type": "long"
},
"http_method": {
"type": "keyword"
}
}
}
Here we just emphasized on a few points that may have not been obvious to elastic at first glance:
- we know that created_at is a date field and should be treated/optimized as such, also the format of this field is known to us so we give it for free to elastic ahead of time, this is really important as it allows us to do some complex queries involving date searches.
- we define status_code as short, we could have chosen keyword type for such a field to better fit, but that will still work.
Let’s get a glimpse of the power of elastic search, let’s say we want to calculate the throughput of one of our endpoints over time, think of that as a DB query and try to imagine the nitty details … had enough? well turns out we have that out of the box using histograms in elastic Aggregations
"aggs": {
"throughput_over_time": {
"date_histogram": {
"field": "created_at",
"fixed_interval": "1m"
}
}
}
just a few lines and we are done! it’s super fast also because of its distributed nature! we could tune the fixed interval to be 1 minute/hour/day/week/etc, just a word change.
How about data ranges? we want to get this histogram in a specified date range
"filter": {
"range": {
"created_at": {
"gte": "START_DATE IN FORMAT (2020-04-01 02:01:01)",
"lte": "END_DATE IN FORMAT (2020-04-10 02:01:01)"
}
}
}
it’s builtin feature also, and we can use it now with ease as we already declared this field as date and elastic understands how to compare this field as a formatted date not just a string.
Finally how about the stats for response time ? average? max? min? and all this stuff
"request_percentiles": {
"percentiles": {
"field": "response_time"
}
},
"request_stats":{
"stats":{
"field": "response_time"
}
}
}
also those are just builtin functions in elastic, we just had to specify the field it’s going to operate on, we already declared response time as long so it can do all math operations on it at ease.
So what happens if Elastic went down?
- we will fallback to serving all requests with some legacy queries in cassandra.
- we start spinning a new index and populating it with data from newest to oldest and try to restore the current faulty index.
- Finally when we are back online we can alleviate the load off cassandra again and redirect such exhaustive read work to Elastic Search instance.
Durability, Care for more problems?
One more thing I will leave you to think about is how the data will grow in our database over time? will it scale? will its operating performance degrade over time? I will give you a little hint, we see in cassandra we have our partition key set to be the service name, which is static! what that means our partition will keep growing endlessly over time making it harder to query and write to those big partitions over time.
We need to add a dynamic part to our partition key so that we have more partitions the more our database grows, we could define a hybrid field to serve as the paritition key, maybe service name + year? or service name + year|month? service name + year|month|day? well they are all possible, it all fallback to some analysis done on your writes frequency to know the write fit for you. After that we won’t need to put a cluster index on creation date anymore, which is good; our cluster index is getting smaller, saving us both write/update time and space.
Finally, I do recommend you delve deeper into Elastic search
Also try those queries to get a feel of how they really work, you can checkout the code at
The service is written completely in Golang. That’s it for this article and be sure to check out the code and reach out if you are in doubt about anything in the code itself.