Queues and Jobs
Queues and Jobs
Introduction
Almost all applications can make use of queues. Queues are a great way to make time intensive tasks seem immediate by sending the task into the background or into a message queue. It's great to send anything and everything into the queue that doesn't require an immediate return value (such as sending an email or firing an API call). The queue system is loaded into masonite via the QueueProvider
Service Provider.
Masonite uses pickle to serialize and deserialize Python objects when appropriate. Ensure that the objects you are serializing is free of any end user supplied code that could potentially serialize into a Python object during the deserialization portion.
It would be wise to read about pickle exploitations and ensure your specific application is protected against any avenues of attack.
Getting Started
All configuration settings by default are in the config/queue.py
file. Out of the box, Masonite supports 2 drivers:
async
amqp
The async
driver simply sends jobs into the background using multithreading. The amqp
driver is used for any AMQP compatible message queues like RabbitMQ. If you do create a driver, consider making it available on PyPi so others can also install it.
Jobs
Jobs are simply Python classes that inherit the Queueable
class that is provided by Masonite. We can simply create jobs using the craft job
command.
This will create a new job inside app/jobs/SendWelcomeEmail.py
. Our job will look like:
Running Jobs
We can run jobs by using the Queue
class. Let's run this job from a controller method:
That's it. This job will now send to the queue and run the handle
method.
Resolving
Notice in the show method above that we passed in just the class object. We did not instantiate the class. In this instance, Masonite will resolve the controller constructor. All job constructors are able to be resolved by the container so we can simply pass anything we need as normal:
Remember that anything that is resolved by the container is able to retrieve anything from the container by simply passing in parameters of objects that are located in the container. Read more about the container in the Service Container documentation.
Instantiating
We can also instantiate the job as well if we need to pass in data from a controller method. This will not resolve the job's constructor at all:
The constructor of our job class now will look like:
Executing Jobs
Whenever jobs are executed, it simply executes the handle method. Because of this we can send our welcome email:
That's it! This job will be loaded into the queue. By default, Masonite uses the async
driver which just sends tasks into the background.
We can also send multiple jobs to the queue by passing more of them into the .push()
method:
Passing Variables Into Jobs
Most of the time you will want to resolve the constructor but pass in variables into the handle()
method. This can be done by passing in an iterator into the args=
keyword argument:
This will pass to your handle method:
Passing Functions or Methods
You can also call any arbitrary function or method using the queue driver. All you need to do is pass the reference for it in the push method and pass any arguments you need in the args parameter like so:
This will then queue this function to be called later.
Note that you will not be able to get a response value back. Once it gets sent to the queue it will run at an arbitrary time later.
AMQP Driver
The amqp
driver can be used to communicate with RabbitMQ services.
Installing
In order to get started with this driver you will need to install RabbitMQ on your development machine (or production machine depending on where you are running Masonite)
You can find the installation guide for RabbitMQ here.
Running RabbitMQ
Once you have RabbitMQ installed you can go ahead and run it. This looking something like this in the terminal if ran successfully:
Great! Now that RabbitMQ is up and running we can look at the Masonite part.
Now we will need to make sure our driver and driver configurations are specified correctly. Below are the default values which should connect to your current RabbitMQ configuration. This will be in your app/queue.py
file
If your rabbit MQ instance requires a vhost
but doesn't have a port, we can add a vhost
and set the port to none. vhost
and port
both have the option of being None
. If you are developing locally then vhost
should likely be left out all together. The setting below will most likely be used for your production settings:
Starting The Worker
We can now start the worker using the queue:work
command. It might be a good idea to run this command in a new terminal window since it will stay running until we close it.
This will startup the worker and start listening for jobs to come in via your Masonite project.
You can also specify the driver you want to create the worker for by using the -d
or --driver
option
Sending Jobs
That's it! send jobs like you normally would and it will process via RabbitMQ:
you can also specify the channel to push to by running:
Failed Jobs
Sometimes your jobs will fail. This could be for many reasons such as an exception but Masonite will try to run the job 3 times in a row, waiting 1 second between jobs before finally calling the job failed.
If the object being passed into the queue is not a job (or a class that implements Queueable
) then the job will not requeue. It will only ever attempt to run once.
Handling Failed Jobs
Each job can have a failed
method which will be called when the job fails. You can do things like fix a parameter and requeue something, call other queues, send an email to your development team etc.
This will look something like:
It's important to note that only classes that extend from the Queueable
class will handle being failed. All other queued objects will simply die with no failed callback.
Notice that the failed method MUST take 2 parameters.
The first parameter is the payload which tried running which is a dictionary of information that looks like this:
and the error may be something like division by zero
.
Storing Failed Jobs
By default, when a job is failed it disappears and cannot be ran again since Masonite does not store this information.
If you wish to store failed jobs in order to run them again at a later date then you will need to create a queue table. Masonite makes this very easy.
First you will need to run:
Which will create a new migration inside databases/migrations
. Then you can will migrate it:
Now whenever a failed job occurs it will store the information inside this new table.
Running Failed Jobs
You can run all the failed jobs by running
This will get all the jobs from the database and send them back into the queue. If they fail again then they will be added back into this database table.
Specifying Failed Jobs
You can modify the settings above by specifying it directly on the job. For example you may want to specify that the job reruns 5 times instead of 3 times when it fails or that it should not rerun at all.
Specifying this on a job may look something like:
This will not try to rerun when the job fails.
You can specify how many times the job will rerun when it fails by specifying the run_times
attribute:
Last updated