Queues and Jobs

Masonite ships with a powerful queue system. This feature is useful for running those tasks that take a while like sending emails, processing videos, creating invoices, updating records and anything else you don't need you users to wait for or jobs that need .

First, jobs are creating with the logic required to make the job run. The jobs are then "pushed" onto the queue where they will run later using "queue workers". You can specify as many queue workers as your server can run.

In addition to running jobs, some drivers allow you to monitor any jobs that fail. Job data will save into the database where they can be monitored and reran if needed.

Configuration

You can easily modify the behavior of your queue system by changing the queue configuration:

The available queue drivers are: async, database and amqp.

A full queue configuration will look like this:

DRIVERS = {
    "default": "async",
    "database": {
        "connection": "mysql",
        "table": "jobs",
        "failed_table": "failed_jobs",
        "attempts": 3,
        "poll": 5,
        "tz": "UTC"
    },
    "amqp": {
        "username": "guest",
        "password": "guest",
        "port": "5672",
        "vhost": "",
        "host": "localhost",
        "channel": "default",
        "queue": "masonite4",
    },
    "async": {
        "blocking": False,
        "callback": "handle",
        "mode": "threading",
        "workers": 1,
    },
}

Default Queue

The default key is used to specify which queue driver in the configuration to use by default. This needs to be the same value as one of the other keys in the configuration dictionary.

Database Driver

To use the database driver you should first create a jobs table:

$ python craft queue:table

This will create a migration file in your migrations directory.

If you want to save failed jobs you should also create a migration for the failed jobs table:

$ python craft queue:failed

You should then migrate your project:

$ python craft migrate

The database driver is used to process jobs and failed job via a database connection.

AMQP Driver

The AMQP driver is used for connection that use the AMQP protocol, such as RabbitMQ.

The available options include:

Async Driver

The async driver will simply run the jobs in memory using processes or threading. This is the simplest driver as it does not need any special software or setup.

The available options include:

Creating Jobs

In order to process things on the queue, you will need to create a job. This job will be treated as an entity that can be serialized and ran later.

For a shortcut you can run the job command to create the job:

$ python craft job CreateInvoice

You will now have a job class you can build out the logic for:

from masonite.queues import Queueable

class CreateInvoice(Queueable):
    def handle(self):
        pass

Any logic should be inside the handle method:

class CreateInvoice(Queueable):

    def __init__(self, order_id):
        self.order_id = order_id

    def handle(self):
        # Generate invoice documents
        pass

Queueing Jobs

You can put jobs on the queue to process by simply passing them onto the queue:

from masonite.queues import Queue
from app.jobs.CreateInvoice import CreateInvoice

class InvoiceController:

  def generate(self, queue: Queue):
    # Jobs with no parameters
    queue.push(CreateInvoice())

    # Jobs with parameters
    # Create an invoice from a payment
    queue.push(CreateInvoice(Order.find(1).id))

You can also specify any number of options using keyword arguments on the push method:

queue.push(
  CreateInvoice(Order.find(1).id),
  driver="async", # The queue driver to use
  queue="invoices", # The queue name to put the job on
)

Queue Workers

To run a queue worker, which is a terminal process than runs the jobs, you can use the queue:work command:

$ python craft queue:work

This will start up a worker using the default queue configurations. You can also modify the options:

A command with modified options will look like this:

$ python craft queue:work --driver database --connection mysql --poll 5 --attempts 2

Failed Jobs

If you configurations are setup properly, when jobs fail, they will go into a failed jobs table. This is where you can monitor why your jobs are failing and choose to rerun them or remove them.

If you choose to rerun your jobs, they will be placed back onto the queue at the end and rerun with the normal job queuing process.

To rerun jobs that failed you can use the command:

$ python craft queue:retry

You can specify a few options as well:

Last updated