Production-ready Websockets using Flask (Python) and Celery Worker and Javascript

Websocket Protocol

Overview

This blog is a continuation of my last blog, which covered the basics of WebSocket, how to construct WebSocket in Python, and use cases of WebSocket. Please see my previous blog — https://progressstory.com/tech/python/deep-dive-into-websockets-with-python/ — for a quick overview of what WebSockets are all about.

As you have already seen the implementation of WebSocket in the previous blog, So in this blog also we are going to use Flask-Socketio for the backend and Socket.io for the frontend. Let’s dive into it.

Architecture

Generally, the architecture plays a big role in any application as it makes development easier, easy to maintain the code and makes the application less error-prone. So if you check flask-socketio github repo. You can see the below sample implementation of WebSocket.

So we don’t have to adopt the above code for production application as the above sample code is for the development server. So we would be using an extensions approach for it. If you haven’t used the extensions approach then please have a look at it. You can find a good read here. So the directory structure would be like this

├── resources/
│ ├── socket
│ │ ├── foo.py
├── app.py
├── extensions.py
├── socket_views.py
└── routes.py

extensions.py

So basically WebSocket provides a full-duplex communication channel over a single TCP connection. Let’s take the example of a chat group app. If you enter in a group then one WebSocket connection will be created cause you can now post to that group and also you keep on receiving the message without reloading the page. So one process is dedicated to your connection. And there can be many users operating the app at the same time.

Gevent as async_mode

So here comes the async_mode inaction. By default, if you don’t provide async_mode parameters then default it will use threading. But in production you don’t want to use threading as the number of users can be 100 or 1000 and threads cannot work in that number.

We will be using gevent in our example as it has out of box support for WebSocket and also supports long polling. In practice, if your application work is mainly I/O bound, it will allow it to scale to potentially thousands of concurrent requests on a single process.

CORS configuration

By default, CORS is enabled for WebSocket which means that only the same host can connect to the backend. But you can also add some other host in the configuration if your frontend is hosted on a different host. Just try to avoid cors_allowed_origins="*" the configuration that will allow all hosts to connect to it.

Class-based Namespace

As being a fan of OOPs, it’s easy to write maintainable code when it comes to Class-based resources. If you have worked with flask-resfulflask-classfulflask-restx . Then this approach is similar to that. Let’s create one socket namespace.

Flask-SocketIO also supports SocketIO namespaces, which allow the client to multiplex several independent connections on the same physical socket:https://medium.com/media/106fbb1239fac29430955a4a0f1db23c

In BaseSocket we can put all the common functionality that we can override in specific socket namespaces. As the most basic events in the socket lifecycle is on_connect and on_disconnect. One important thing to keep is removing DB session on disconnection of a socket to avoid overflow of DB connections

QueuePool limit of size 5 overflow 10 reached

It’s just a cleanup activity so that after every disconnection DB session will be removed.

app.py

resources/socket/foo.py

Here you can see you can use the base class that we created before BaseSocket .

Now every function of the socket namespace will act as an event if the function starts with on_ like in this example on_get_foo_details .

self is having the socket connection context for example you can access the namespace from self, using self.namespace attribute.

Now we want to initialize these socket namespace classes. For that, we will use our socket_views.py filehttps://medium.com/media/884def501c55d7d1b5dfe031cfce58d2

and our app.py

app.py

This is how the socket will be attached to the flask context.

Celery Worker with Socket

Let’s say you want to send some message to the frontend from the celery task that is asynchronously running. With Websocket that is also possible. But how worker that is detached in nature can send message to frontend.

Here the chat room feature of WebSocket will come into the picture. So basically when you group users into subset to address them together that is a chat room.

For example, if you are chatting with someone then you and the other person are in the chat room. And if you send a message then another person will receive it and can read it. Same applies to a chat room with multiple users. If one user posts a message then all of them can read it.

In our example consider frontend, API and worker as different users and queue as a room.

So frontend will send a WebSocket request to the backend and there it will join a room and send a task to the queue. Now worker will pick the task from the queue and send some messages to the room. Now backend and frontend will receive this message as both of them are in a room.

Let implement this. But first we have to pass queue in Socketio initialization

app.py

foo_socketio.py

task.py

Here the important thing to note is we are sending an event to a room using room keyword so that API and frontend will receive this event and also include_self keyword is False so that this event will not be received by celery worker itself. And also one minor thing is that I’m passing testing in skip_sid because default if include_self is false then flask_socketio will try to get the sender’s identity from flask app context, but in this case, there will be no flask app context cause this code is in celery worker so it’s just a hack for now.

Deployment

As we have already discussed, that to support multiple connections we are using Gevent as an async mode, so we will be using gunicorn with a patched version of gevent as a worker, class using the below command, but first, install gevent-websocket for that

pip install gevent-websocketgunicorn -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker -w 1 module:app

Conclusion

I hope your production server will be stable with these configurations. You can also reach out to me regarding any issue that you are facing on production as we have also faced a lot of issues initially on production.

Leave a Comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.