Ok, so you have a Python web app that needs to allow users to initiate long-running processes in the background (the kinds of things that can’t run synchronously, because the web app will time out before they finish). What’s the best way to do that? Well, very often the standard answer in the Python community is, “use Celery.” But what if your application has to run on a Windows system? Celery officially dropped Windows support back in 2016 with the release of v4.0. So, what then?
The Why
I’m currently working with a client, supporting a Python Flask application that finds itself in this exact situation. The application is built on top of Siemens’ Power Systems Simulation for Engineering (PSS/E), which will only run in a Windows environment. In order to handle those long-running, asynchronous processes, the application currently makes use of a worker pool, built using traditional Windows services. But there are a host of issues with the current solution. The worker services are terribly brittle; they often fail completely when an error is thrown. And perhaps even worse, sometimes these failures don’t kill the service outright, but instead simply cause the worker to enter a “zombie-like” state where they don’t exactly “die” but they can no longer do any useful work. This makes troubleshooting worker failures a laborious investigative process that often results in the same unsatisfying conclusion: “restart it and hope to catch the root cause of the problem next time.” Additionally, the fact that Windows services run in their own unique operating context can lead to issues with file references and user permissions, and often makes using Python virtual environments a complete non-starter.
So, after some research and experimentation, I settled upon using another “Celery-like” Python project called Dramatiq. Dramatiq’s developer actually created it to be a more light-weight, intuitive and simpler alternative to Celery’s complex and sprawling “spaghetti” code. His words, not mine. And, most importantly, Dramatiq is completely happy running on Windows. But, just like Celery, Dramatiq is not built to operate in a vacuum. These distributed task queue libraries are designed to work in concert with some form of message broker system. In the case of Celery, it is very often paired with Redis as its message queue of choice. However, Redis has its own issues in a Windows environment. Unless you want to use an older binary, or care to compile the source code yourself (as explained here), you may want to go another route. And that’s where RabbitMQ comes in. RabbitMQ is another mature, full-featured and widely used message broker that…you guessed it…supports Windows.
Another key piece of our new architecture employs the use of the Non-Sucking Service Manager (NSSM) to install our Dramatiq and RabbitMQ services. NSSM has a built-in feature that will automatically restart services when they fail (bye bye, custom Python service-restarting code). And NSSM seems to work well with Python virtual environments. This was helpful since we use the same Python version that PSS/E uses. The standard Windows Python installer doesn’t allow you to install the same version twice, and I don’t really like the idea of installing our app’s Python libraries into PSS/E’s site-packages, which could potentially result in conflicts.
The How
So, now that you know why I settled on this new architecture, I’ll try to give you the broad strokes on how you could go about setting it up for yourself. First of all, before installing RabbitMQ, you’ll need to install its prerequisite, Erlang. RabbitMQ doesn’t yet support any Erlang releases after v26, so I installed v26.2.5.4. Keep in mind that Erlang will need to be installed by an account with Administrator privileges. With Erlang installed, you can then install RabbitMQ. Rabbit’s installer wants to automatically create a standard Windows service during the installation, but since we’ll be using NSSM, you can go ahead and uncheck that box in the RabbitMQ installation dialogue. After the installer completes, you can install your own service using NSSM. Open a command prompt (with Admin privileges) and run:
C:\path\to\nssm.exe install <rabbitmq-service-name>
In the NSSM dialogue, specify the following:
- Path: Point to the
rabbitmq-server.bat
file. By default, it will be located here:
C:\Program Files\RabbitMQ Server\rabbitmq_server-<version-number>\sbin\rabbitmq-server.bat
- Startup directory: Set this to the parent
sbin
directory in the RabbitMQ installation:
C:\Program Files\RabbitMQ Server\rabbitmq_server-<version-number>\sbin
You can tweak other settings, like throttling and restart delay timings if you want, and then click Install Service
. Once the service is installed, you can start it up. In my experience, RabbitMQ can take several seconds to start, and NSSM doesn’t wait long enough before warning you that the service did not start up correctly. In fact, RabbitMQ is likely still starting and just needs a little extra time. So I wrote a batch file to start the service that will check the service’s status after a short delay before determining if there was a problem. It looks something like this:
set NSSM_PATH="C:\path\to\nssm.exe"
set WAIT_TIME=5
%NSSM_PATH% start %SERVICE_NAME% 2>&1
if %errorlevel% neq 0 (
echo NSSM reported an error. Waiting %WAIT_TIME% seconds before checking again...
timeout /t %WAIT_TIME% /nobreak >nul
echo Current status:
sc query %SERVICE_NAME%
echo If the service STATE is still not 'RUNNING', check the Windows Event Viewer for error messages.
exit /b 0
) else (
echo NSSM started the %SERVICE_NAME% service successfully.
exit /b 0
)
With the service running, I did a little more tweaking of the out-of-the-box RabbitMQ set up. First, I wanted to set up and run the RabbitMQ web dashboard where I can view message metrics and administer the queues. This requires synching the system and user erlang cookies. Run these commands in another (elevated permissions) CMD prompt:
set VERSION=3.13.7
set PATH=%PATH%;C:\Program Files\RabbitMQ Server\rabbitmq_server-%VERSION%\sbin
copy /y C:\Windows\System32\config\systemprofile\.erlang.cookie "%USERPROFILE%\.erlang.cookie"
:: Verify that the cookies are setup to allow checking RabbitMQ status
rabbitmqctl.bat status
I also chose to remove the default “guest” RabbitMQ user and set up one of my own, just for an added bit of security:
set RABBITMQ_USER="<rabbitmq-user-name>"
set RABBITMQ_PASS="<rabbitmq-password>"
rabbitmqctl.bat add_user %RABBITMQ_USER% %RABBITMQ_PASS%
rabbitmqctl.bat set_permissions -p / %RABBITMQ_USER% ".*" ".*" ".*"
rabbitmqctl.bat set_user_tags %RABBITMQ_USER% administrator
rabbitmqctl.bat delete_user guest
Check that the new user is set up correctly:
rabbitmqctl.bat list_users
The output should look something like this:
Listing users ...
user tags
<rabbitmq-user-name> [administrator]
Finally, enable RabbitMQ’s management web console:
rabbitmq-plugins enable rabbitmq_management
You may need to restart the RabbitMQ service for these changes to take effect. Then browse to:
http://localhost:15672
(the login and password will be whatever you chose above)
If you can see the Rabbit console in your browser, we’re ready to write some Python code. First you’ll need to install the Dramatiq package, and possibly some other libraries depending on how fancy you want to get with your message broker (like pika for RabbitMQ).
pip install dramatiq pika
Then you can write a simple Dramatiq worker to handle a task:
# simple_worker.py
import dramatiq
from dramatiq.brokers.rabbitmq import RabbitmqBroker
rabbitmq_url = (f"amqp://{<rabbitmq-user-name>}:"
f"{<rabbitmq-password>}@"
"localhost:5672")
dramatiq.set_broker(RabbitmqBroker(url=rabbitmq_url))
@dramatiq.actor
def simple_task(message):
logger.info(f"Received message: {message}")
time.sleep(2) # Simulate work
logger.info("Task completed.")
if __name__ == "__main__":
tasks_to_send = 10
for i in range(tasks_to_send):
simple_task.send(f"Simple Task - {i}")
This simple worker can be started up in a terminal (after activating your python environment). Navigate to the directory containing your worker code and run:
dramatiq simple_worker
That will start up the worker, running in the foreground. Then in another terminal send tasks by invoking the code in the main block:
python simple_worker.py
This will send task messages to the RabbitMQ broker, and the Dramatiq “actors” will start to process them. You can watch the logging output in the first terminal to see your worker processes in action. If you ratchet up that tasks_to_send
variable to something like 100 or 1000, and then open up the RabbitMQ dashboard, you’ll be able to gleefully kick back and watch the messages enqueue and dequeue themselves as your workers churn away. (It’s really a good time. I highly recommend it.)
I also created some workers to test the resilience of the platform as well. As mentioned, my current application is very touchy when it comes to runtime errors, and I had no interest in setting up a brand new, equally fragile system. So, I threw some errors at it…quite a few errors, actually. I built several variants of error-producing actors, such as:
@dramatiq.actor(max_retries=3)
def fail_task():
raise Exception("Intentional Failure")
@dramatiq.actor
def division_by_zero_task():
result = 1 / 0 # This will raise a ZeroDivisionError
@dramatiq.actor(max_retries=0)
def timeout_task():
time.sleep(5)
raise TimeoutError("Simulated Timeout")
And then submitted them in loops of hundreds and thousands. I watched them all make their way through Rabbit’s retry and dead letter queues by the hundreds. I built a script to monitor the worker processes and threads in real-time. And Dramatiq never flinched. It kept chugging along and didn’t ever seem to break a sweat. It was truly a sight to behold.
When you’re ready to take this to the next level, you can set up another NSSM service to keep your Dramatiq workers up and running at all times in the background. For this service I created a batch file to handle the activation of the virtual environment and give me the ability to easily adjust some command line settings on the Dramatiq worker itself. Thus, the Path
setting in the NSSM dialogue for this service just pointed to the .bat
file, which looks like this:
set CONDA_BASE=C:\ProgramData\anaconda3
set PATH=%CONDA_BASE%;%CONDA_BASE%\Scripts;%PATH%
CALL %CONDA_BASE%\Scripts\activate.bat ${virtual_env_name}
${virtual_env_name}\python.exe -m dramatiq simple_worker --processes 4 --threads 8
And just like that, my worker pool was ready for prime time.
The Extras
While Dramatiq comes with a number of well-thought-out default settings, you may decide you want to fine tune some of these configurations to really get the most out of your worker pool setup. Below are some of the settings I’ve modified in my application, but rest assured, there are many, many more for you to tinker with, should you choose.
Worker resources
In that last line of the batch file above, I added some configuration in the dramatiq
command line. The --processes
argument specifies how many OS processes the worker will start, and the --threads
argument controls the number of threads for each process. You can play with these settings to tune Dramatiq for your system.
Queue naming
By default, when a Dramatiq worker starts it will create (or reuse) three RabbitMQ queues: one for the main message queue (default
), one for task retries (default.DQ
), and one as a dead letter queue for tasks that have failed too many times (default.XQ
). However, if you want to specify a different queue name (or wish to manage multiple different queues in your application), you can specify the queue name in each actor’s annotation:
@dramatiq.actor(queue_name=my_queue)
def simple_task(message): ...
This will result in Dramatiq creating three new queues (my_queue
, my_queue.DQ
, and my_queue.XQ
) when this worker is started.
Task retries
Dramatiq actors come pre-configured to retry failed tasks with an exponential backoff over seven days. If you want to change this value or turn off retries entirely, you can specify that in the actor annotation as well, with arguments like max_retries
or max_backoff
. This can be a nice feature, if you just need your tasks to finish “eventually.” But keep in mind that if you’re using the retry feature, your tasks should be idempotent, or you might get some unpredictable results when some tasks die mid-stream:
@dramatiq.actor(queue_name=my_queue, max_retries=2)
Prefetch
Actors can also be configured to fetch more than one message at a time from the message queue. This can be very useful in high-throughput systems, where the average task handling times are subsecond and there are thousands or even millions of messages coming through the queue. The idea is that there’s no need to incur the network communication overhead for each and every message fetch. However, in other situations (like in the app I’m currently supporting), you could have the opposite situation, where there isn’t a high volume of messages on the queue at any time, and the tasks themselves take a while to complete. In this case, it may be worth dialing back the prefetch value, so that you don’t have a situation where one worker grabs the last few tasks off the queue, and then slowly processes them in series. This leaves the other workers effectively starved of work and idle. In the end, I wound up configuring my workers to only process one task at a time by setting an environment variable in the code:
os.environ["dramatiq_queue_prefetch"] = "1"
Time to relax…
So to sum things up, hopefully you now have a good idea about why someone might want to try out Dramatiq, as well as a solid head start on setting it up in your own environment. So far, I’ve found Dramatiq to be very easy to work with, and there’s also a sizable community of users out there when questions arise. If you choose to go this direction, I wish you many hours of happily watching the seamless processing of messages, while you count all the lines of code you didn’t have to write to make it all happen. Here’s to working smarter, not harder!
Have you found other exciting ways to use or configure Dramatiq in your applications? Let us know in the comments. Cheers!
Loved the article? Hated it? Didn’t even read it?
We’d love to hear from you.