Going Event Driven: Building your own real-time Messaging system using Pub/Sub Model

Kareem Emad
8 min readSep 26, 2020

In this article we will go through the motivation behind event driven programming, why you should consider and how it helps you write better decoupled code.

let’s begin with an example from Righting Software by Juval Löwy -A great book I highly recommend-, the example includes a classical case where you have 3 services in your system, and you want to start from Service A, then send a request to B, then C then go backwards feedback till you reach A again and respond to the client making the request

example taken from chapter 2 Righting Software by Juval Löwy

Here not only we have a problem with coupling as mentioned in the article, the fact that every service needs to know the sequence of calls that need to be made to fulfill the request is surely affecting the coupling of those services but there is another things you need to consider.

What if service B failed to fulfill the request made by A at this time? maybe due to high demand on B service it broke down or started dropping new requests, in that case you will go back shamefully telling the client to try again later, you delegated the responsibility of retrying to the client in this case.

Another thing to consider here is the open connection between the client and service A all this time, starting from:

  • Logic happening in service A
  • Logic happening in service B
  • Logic happening in service C
  • Feedback To A

The point here is does this has to be sequential ? can these things happen concurrently? does the client need to wait for you to finish all this or you just give him 200 ok after starting to work on the task?

These questions lay the basis for using pub/sub, let’s do the same request again using events.

Now instead of waiting for everyone to finish, we will do as follows:

  • client sends request to A
  • A fires an event X signaling it finished its work and that anyone else who has to do something for X to be done should kick off now.
  • A sends feedback to client that everything is on its way to be done and you don’t have to worry anymore.
  • B & C are both subscribed to the the event/topic X so they get called now.
  • B & C could in turn fire events when done too, if anyone is interested in such events by C and B, he will surely get notified.
  • A may get updates from C and B events to update the status of the request to finished.

A few things to notice here:

  • A has no idea what’s going on beyond firing event X, there could be dinosaurs from cretaceous period there doing the work, and A can’t care any less about these details.
  • We don’t have to worry about retrying requests, the new service handles all retries for us till it gets the job done, so we have just implemented a service wide retry strategy without even noticing, how cool is that!

The only service that knows everything about the wiring and communication between services here is the event manager service, and that works for the best, now we can change the sequence of calls made at any time just by changing the subscriber lists in the event manager service.

Our service here acts as those jumper wires in the breadboard, and all other services(A,B,C,..) are the pins that gets connected by those wires

So let’s dive deeper into this event manager service and see how we can actually build one ourselves, first of all this is no new thing here, Google Cloud and AWS already have versions of such real-time messaging systems, but let’s say we don’t want to pay for such services and want to maintain our own custom in-house version of such system.

We start off by the system requirements, what are we expecting from such service at an interface level?

  • We expect to have a way to send message with certain topics to this service, preferably through an SDK.
  • We expect to be able to specify the body, headers, query parameters used in the subscribers’ request from the message we sent.
  • We expect to be able to set new topics and add subscribers to it in this service (database).
  • We expect to be able to send any kind of data in the message with no restrictions on the body/headers/.. to be sent.
  • We expect to be able to add filters on each subscriber so that we could skip some messages even though he is subscribed to this topic, for example a subscriber that only listens to messages from topic X that has body.green_light == True.
  • We expect this service to be super doper fast, no one should feel its existence.

Now we have some fair list of requirements, let’s see how we can go about designing such service, for the tech stack I will just go with Go to satisfy the last requirement hopefully.

So let’s start with a simple diagram for how this service is going to work:

  • Let’s say we receive a request from some client containing the topic name and some other data for the subscribers’ request.
  • We then extract the topic from the request, load all subs linked to this topic, and start sending them requests.
  • If some subscriber returned a response in [4XX, 5XX] we need to retry the request.
  • we send back 200 ok to the client after we are done with the subscribers.

That design is actually catastrophic in every sane way, not only we are maintaining an open connection to the client (inward) as all this happens, but also we have an uncontrollably growing number of outgoing connections to the subscribers, which may take too long to respond or fail and has to be retried. with enough pressure on such service, we could get our first memory leak and our lovely service will be torn apart sadly.

man drowning in tape shouting “alhaany ya moatz”

We could do better than that, we need to control the max number of requests being executed at any moment in our system, any more requests that gets added will be scheduled for later not now! so we need some kind of a good scheduling system to do this for us, for this case let’s try faktory .

Now our system has changed, let’s redo the sequence of execution again:

  • We receive a request from the client.
  • We fetch the subscribers for the specified topic.
  • Now we schedule a task for each subscriber and let faktory handle when to execute this job(the job simply is to call the subscriber with the data provided in the message).
  • We send back 200 ok to the client after we have sent all tasks to faktory, the client does not need to know when the subscribers’tasks are done so it’s enough he knows we are going to do what he asked for some time.

With faktory our work concluded at the point of throwing the task in the queue, faktory handles retrying failed requests for us and works at a certain concurrency level we specify.

But we are not done yet! faktory needs the code to execute the job given the data, we need to implement a function he could call when he wants to execute a task.

This function should:

  • read and extract the request data and url to be used in the http request from the message.
  • verify that the filter for this subscriber evaluates to true on such message,

The boolean expression for the filter could be something like:

data[‘body’][‘category’] == ‘food’ and data[‘headers’][‘mode’] == ‘quiet’“

and this will be stored as string in the Database for each subscriber, and we are supposed to somehow read this string and apply it to each message we get and see if it evaluates to true or false.

At this moment we could go burn our brains out and try to fight with a strongly type language like Go to achieve this but luckily faktory allows us to write the worker/consumer in a different language, so…….

Welcome onboard python, we needed a weakly-typed language to do this task in a few lines, actually in python we could do the filter evaluation in just one line! using eval function just say eval and pass the string and all variables will be loaded and evaluated auto magically.

Now one last thing we didn’t consider is how Go is going to handle the unknown structure of the user message coming in the request? we just said it’s a strongly typed language so to use it we need to pre-define everything in a struct, there is one dirty workaround to do is by defining an abstract map of (interface, interface) type and …

A picture that says a thousand word.

This is going to be ugly, so instead we could do a smart trick at the SDK side, before we send the data we take the whole json structure and encode it to a base64 string, now we don’t have to deal with the payload at all till the message reaches the python worker which will easily decode the json to a dict safely.

With that we have a good functioning service satisfying most of the requirements above. the code for this service is available on GitHub you can check it out at

and I also implemented two simple SDKs in python/javascript

Feel free to reach out to me if you are in doubt about any part of the code, this was so much fun to me to go through as an experience and I hope it was for you too.

--

--