Converting monolithic to asynchronous microservices

Converting monolithic to asynchronous microservices

While designing any large scale distributed systems, we start with keeping in mind both functional and non functional requirements, but eventually we tend to mainly focus on only achieving functional requirements. We fail to emphasize on non functional requirements such as latency, throughput and scalability aspects. As even the saying goes, No architecture is final/best we should always strive to make it better.

For a better user experience, applications should be intuitive and quick in processing any requests, for that the backend business logic on the server-side should be super fast. There are a couple of ways to achieve that, one is making use of suitable algorithms and data structures to reduce the time complexity at a low-level implementation, but in most cases, despite using algorithms and data structures we end up taking longer than expected to respond. So we go with other optimization techniques to improve at a high level, by identifying independent sub-modules within the application and checking if they could be parallelized. If we can, then we make them as separate services and utilize them. We already understood from the previous posts Why Algorithms? and the Importance of algorithms in application development, in this post we will discuss how to convert monolithic architecture into microservices and possibly making them asynchronous if the order of computation isn’t necessary. Before understanding the concept and knowing what we are trying to do, you can download experimental code used here from my GitHub repo Monolithic2AsyncMS,

What are monolithic and microservices architectures?

We have already discussed in length what is monolithic architecture and how we transitioned into microservices and its history at the post, Serverless Architecture. In this post, we will understand how to achieve it in abstract by taking a use case of analyzing CCTV camera feed for traffic rules violation. Our requirement is to recognize the person who violated the rule, and his vehicle registration number given an image. Ideally in monolithic architecture, we feed images to the server where we execute engines to perform tasks such as vehicle detection, vehicle number detection, user recognition. In this case, all tasks are mutually exclusive to each other, so all of them can run in parallel. So the first level of optimization would be to run these tasks in subprocess or threads and wait for them to complete. But there are several problems with this approach such as if implemented in python and used threads then performance will not be up to the mark because of the GIL problem. Or if the tasks are dependant on GPU resources then due to the limitation of hardware resources, we may run into CUDA out of memory errors. Even the issue of availability will be a concern because if one sub-module failed then the entire application will be down.

Monolithic vs microservices architecture

So we need to divide these sub-tasks into separate services and let them run in respective servers. As you can see from the above diagram, we get a request to process an image at a server, and we call it an orchestrator because its job is to perform all subtasks and respond back to the requester with all intermediate outputs. We will be using python programming language and flask server to implement all requirements.

from flask import Flask, Response, request
app = Flask(__name__)
requestProcessor = ProcessRequest()

@app.route('/health')
def health():
    return '200'

@app.route('/experiment/aync/', methods=['POST'])
def classify_image():
     start_time = time.time()
    img_file = requestProcessor.save_image()
    requestProcessor.process_request_serially(img_file)
    end_time = time.time()
    print('Total processing time: ', (end_time - start_time))
    data = dict()
    data['Time'] = end_time - start_time
    response = Response(response=json.dumps(data),
                status=200,
                mimetype='application/json')
    return response
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=10080)

Here we are using a flask server to expose POST API/endpoint for other modules or client to request by sending image in the request body. In the architecture diagram we are using an S3 bucket to upload/download, but to emphasize what we are trying to achieve let’s stick to the method of sending image in the POST request body. We are running this server in an orchestrator or coordinator server which does all the processing and sending back the response to the client. Now lets implement ProcessRequest class to download image and process it

class ProcessRequest:
    def __init__(self, ):
        self.is_debug = True
        self.UPLOAD_FOLDER = './queryinput/'
        task_one_url = 'http://127.0.0.1:10081/experiment/task_one'
        task_two_url = 'http://127.0.0.1:10082/experiment/task_two'
        task_three_url = 'http://127.0.0.1:10083/experiment/task_three'
        self.tasks_url_list = [task_one_url, task_two_url, task_three_url]
        try:
            os.makedirs(self.UPLOAD_FOLDER)
        except FileExistsError:
            pass
    def save_image(self):
        if self.is_debug:
            print('Files in request: ', request.files)
        img_file = request.files['file']
        curr_date_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
        curr_file_name = os.path.join(self.UPLOAD_FOLDER, curr_date_time + '_' + img_file.filename)
        if self.is_debug:
            print('Saving image at ', curr_file_name)
        img_file.save(curr_file_name)
        return curr_file_name
    def process_request_serially(self, img_file):
        if self.is_debug:
            print('Processing request', img_file)
        files = {'media': open(img_file, 'rb')}
        for url in self.tasks_url_list:
            response = requests.post(url, files=files)
            response = response.json()
            print('URL: ', url, '; Response: ', response)

We will be calling the car detection module task_one, the car number recognition module task_two and the user face recognition module task_three. I am considering them as a black box to keep it simple and showcase the importance of microservice architecture. As I am running all of the modules in my local server, I am setting URLs to localhost(127.0.0.1).

from flask import Flask, Response
import time
import json

class TaskOne:

    def __init__(self):
        self.is_debug = True

    def performe_task_one(self):
        if self.is_debug:
            print('Performing task one: ')
        time.sleep(3)

app = Flask(__name__)
taskOne = TaskOne()

@app.route('/health')
def health():
    return '200'

@app.route('/experiment/task_one', methods=['POST'])
def perform_task_one():
    start_time = time.time()
    taskOne.performe_task_one()
    end_time = time.time()
    data = dict()
    data['Time'] = end_time - start_time
    response = Response(response=json.dumps(data),
                            status=200,
                            mimetype='application/json')
    if taskOne.is_debug:
        print(response)
    return response

if __name__ == '__main__':
    app.run(host='0.0.0.0', port='10081')

All 3 sub-tasks will be executed in respective servers and exposing their endpoints for the orchestrator to call them. Here we are running task_one flask server at port 10081 and exposing endpoint “/experiment/task_one”, similarly task_two and task_three will be run on 10082 and 10083 ports and exposing endpoints “/experiment/task_two” and “/experiment/task_three” respectively.

I usually test is either with postman or curl command, so we just need to hit orchestrator endpoint with image in the body.

POST curl command for serial execution of tasks and server response
Total time taken for processing all 3 tasks serially is 9.12 seconds

3 seconds each for 3 sub-tasks, so totally it’s taking 9.12 seconds to process a request. Generally, this is very high latency. Hence the next level of optimization should be to execute these sub-tasks concurrently. In order to achieve that, we will make use of python’s asyncio library which helps us in writing asynchronous code. Our aim is to execute or trigger all three endpoints at the same time and expect them to concurrently execute their respective jobs and once all the intermediate results are available, the orchestrator should process them and return back to the client.

import asyncio
class ProcessRequest:
    def process_request_parallely(self, img_file):
        if self.is_debug:
            print('Processing request parallely: ', img_file)
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self.process_parallely())

asyncio works on the principle of producer-consumer design pattern. Where we have a queue called event loop and all the executables should be added as tasks in the queue. The event loop will execute them one by one without waiting for each of them to complete. The control of execution will continue until and unless we wait on the results of the tasks to gather. In the above code, we are creating a new event loop and calling method to be run until it is complete.

from concurrent.futures import ThreadPoolExecutor
async def process_parallely(self):
        with ThreadPoolExecutor(max_workers=3) as executor:
            with requests.Session() as session:
                loop = asyncio.get_event_loop()
                tasks = [
                        loop.run_in_executor(
                            executor,
                            self.fetch,
                            *(session, url)
                        ) 
                        for url in self.tasks_url_list
                ]
                count = 0
                for response in await asyncio.gather(*tasks):
                    response = json.loads(response)
                    count += 1
                    print(response)

    def fetch(self, session, url):
        with session.post(url) as response:
            data = response.text
            if response.status_code != 200:
                print('Failed: (0)'.format(url))
            return data

In that method execution, we will create a pool of threads to process each task. The no of threads should be dependent on the CPU core, for simplicity we are keeping it 3 here. We are creating a requests session to initiate POST requests for all the 3 URLs. We call the fetch method to request each sub-tasks and process the response, fetch method will be executed in a separate thread such that all 3 sub-tasks will be executed concurrently. And also here we are waiting for all the tasks to complete because we want to process the response sent from each sub modules. So we are awaiting to gather using asyncio.gather method. Once the orchestrator collects and processes all responses, it will send back responses to the client request. Let’s do the same curl command this time by executing sub-tasks concurrently

POST curl command for concurrent execution of tasks and server response
Total time taken for processing all 3 tasks concurrently is 3.07 seconds

We could achieve all our 3 sub tasks within 3 seconds by executing them concurrently by making use of asynchronous feature of python.

In conclusion, we were able to understand the importance of dividing the monolithic architecture into asynchronous microservices and how to achieve it using asyncio library of python. We could have executed tasks asynchronously within the same server using sub-process and threads but as we discussed there are lot of drawbacks of it and hence it is always better to segregate into microservices and deploy it in respective smaller servers. This way we will be able to achieve separation of concerns design pattern as well. Do comment your opinion and any suggestions for improvements.

Comments are closed.