Defining a Queue

To define a queue for your app, define your queue via the queues object on the options object passed to joystick.app():

/index.server.js

import joystick from '@joystick.js/node';

joystick.app({
  queues: {
    example: {
      run_on_startup: true,
      concurrent_jobs: 10,
      jobs: {
        example_job: {
          run: (payload = {}, job = {}) => {
            // Handle work for the job here...
            // Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
          }
        },
      },
    },
  },
  routes: { ... }
});

In Joystick, a queue is a constantly running/ticking process (the queue tick is every 300ms) which pulls database entries representing jobs to run. Depending on the rules you define for a queue, Joystick will attempt to run a job using a FIFO (first in, first out) approach. So, if we add one job at 12:01 PM and another at 12:03 PM, we'd expect the 12:01 PM job to run first, then 12:03 PM, and so on.

Above, we've defined a simple queue called example. To define it, on the queues object we pass to our joystick.app() options, we add a key/value pair where the key is the name of our queue and the value is an object defining the queues behavior and supported jobs.

For our options, we've specified that we want Joystick to automatically start this queue on startup via run_on_startup: true, run a maximum of 10 jobs concurrently (at the same time), and we've defined a single job example_job.

Running jobs

Ultimately, a "job" is just an arbitrary function that we call at a specified time, passing in the specified payload for the job (learn more here) and the job instance. Via the job object passed as the second argument to our job's run method, we receive a few different methods that manage the "state" of the job:

  • job.completed() is a function returning a JavaScript Promise that allows us to signal back to the queue that whatever work we intended to complete is completed.
  • job.failed() is a function returning a JavaScript Promise that allows us to signal back to the queue that whatever work we intended to complete has failed.
  • job.requeue() is a function returning a JavaScript Promise that allows us to signal back to the queue that we'd like to try the job again (presumably because it's current run has failed or some prerequisite was not met).
  • job.delete() is a function returning a JavaScript Promise that allows us to signal back to the queue that we'd like to permanently delete the job (i.e., remove it from the queue).

One of these methods must be called

In order for a job to not get "stuck" in the queue, one of these methods must be called to manage the state of the job. Even if the code you execute is successful, your queue has no way of knowing that unless you tell it.

Cleaning up jobs

By default, after a job is either marked as .completed() or .failed() that job's entry in the database is not removed. This can be helpful for auditing purposes and is assumed a sensible default. If you do not need to keep old jobs in the database after they've either completed or failed, it's recommended to utilize the cleanup option on your queue definition to automate this process:

/index.server.js

import joystick from '@joystick.js/node';

joystick.app({
  queues: {
    example: {
      run_on_startup: true,
      concurrent_jobs: 10,
      cleanup: {
        completed_after_seconds: 60,
        failed_after_seconds: 60,
      },
      jobs: {
        example_job: {
          run: (payload = {}, job = {}) => {
            // Handle work for the job here...
            // Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
          }
        },
      },
    },
  },
  routes: { ... }
});

Above, we've added an object cleanup to our queue definition with two child properties: completed_after_seconds and failed_after_seconds. As these names imply, this tells Joystick to automatically clean up or delete jobs in the database 60 seconds after they've either completed or failed. Setting these values ensures that your database doesn't become overwhelmed with old data, so using this option is highly recommended unless auditing data is mandatory (e.g., for regulatory compliance).

Retrying jobs after server restart

Because your queue is running on a constant tick/interval, for the sake of consistency, it's important to avoid jobs accidentally being dropped due to a server restart. A job can be dropped if your server restarts while it's status is running (e.g., as part of a redeployment in a non-development environment). To ensure that a job run survives a restart, it's important to set the retry_jobs_running_before_restart flag on your queue definition:

/index.server.js

import joystick from '@joystick.js/node';

joystick.app({
  queues: {
    example: {
      run_on_startup: true,
      concurrent_jobs: 10,
      retry_jobs_running_before_restart: true,
      jobs: {
        example_job: {
          run: (payload = {}, job = {}) => {
            // Handle work for the job here...
            // Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
          }
        },
      },
    },
  },
  routes: { ... }
});

When your server starts back up, Joystick will automatically look for jobs with a status of running and set them back to pending. Because of a queue's FIFO (first in, first out) nature, those jobs will be run ahead of any other jobs to ensure they're completed relative to their specified time.

Setting max job attempts

In some cases, a job's completion may be indeterminate based on various factors. By default, unless you instruct otherwise, Joystick will continue to attempt a job indefinitely. To put an upper bound on this, utilizing the max_attempts option on your job definition is recommended:

/index.server.js

import joystick from '@joystick.js/node';

joystick.app({
  queues: {
    example: {
      run_on_startup: true,
      concurrent_jobs: 10,
      jobs: {
        example_job: {
          max_attempts: 5,
          on_max_attempts_exhausted: (job = {}) => {
            // Optional: handle a max attempts exhausted event...
          },
          run: (payload = {}, job = {}) => {
            // Handle work for the job here...
            // Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
          }
        },
      },
    },
  },
  routes: { ... }
});

On a job you'd like to limit the maximum attempts for, add a max_attempts flag set to an integer with the maximum number of attempts to try (e.g., above we've set this to 5). Additionally, if you'd like to be notified when the max attempts are exhausted (e.g., alerting administrators or performing some work related to the job), the on_max_attempts_exhausted() hook function can be added. This function is called immediately after the max limit is reached and receives the current job instance as an argument.

Requeing jobs on failure

Depending on the nature of your app, in some cases, you may want to retry a job when it fails. To avoid messy code, Joystick offers a convenience setting for your job requeue_on_failure that can be set to automatically add the job back into the queue for another attempt.

/index.server.js

import joystick from '@joystick.js/node';

joystick.app({
  queues: {
    example: {
      run_on_startup: true,
      concurrent_jobs: 10,
      jobs: {
        example_job: {
          requeue_on_failure: true,
          run: (payload = {}, job = {}) => {
            // Handle work for the job here...
            // Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
          }
        },
      },
    },
  },
  routes: { ... }
});

As soon as the job.failed() method is called on any example_job attempt, Joystick will automatically shift the job's status back to pending and schedule it to run 10 seconds in the future.

Pre-flight options

To control the flow of jobs, an additional preflight option can be added to job definitions to decide whether or not they can be added to the queue or run. This is a helpful safety mechanism for jobs that are run often, or, are tied to conditional data like a user's subscription status.

/index.server.js

import joystick from '@joystick.js/node';

joystick.app({
  queues: {
    example: {
      run_on_startup: true,
      concurrent_jobs: 10,
      jobs: {
        example_job: {
          preflight: {
            on_before_add: (job_to_add = {}, db = {}, queue_name = '') => {
              // Optional: decide if the job can be added to the database/queue.
              return true;
            },
            okay_to_run: (payload = {}, job = {}) => {
              // Optional: decide if the job can be run right now.
              return true;
            },
            requeue_delay_in_seconds: 60,
          },
          run: (payload = {}, job = {}) => {
            // Handle work for the job here...
            // Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
          }
        },
      },
    },
  },
  routes: { ... }
});

There are two core preflight options that can be set, both functions: on_before_add() and okay_to_run(). on_before_add() is intended to be a stop-gap for adding jobs to the queue/database (e.g., preventing a job related to a premium subscription feature being added for a user without an active subscription) and okay_to_run() is intended as a check to ensure the job can run now or should be saved for later (e.g., if the job is to hand off some work to an encoder, ensure that the encoder isn't already overwhelmed before running the job).

Both functions are expected to return a boolean value. For the second function, okay_to_run(), if the job is okay to run in the future (but not right now), it can be automatically requeued in the future using the preflight.requeue_delay_in_seconds option to reschedule the job for a future attempt.

API Reference

Queue Definition API

Queue Definition API

{
  queues: {
    [queue_name: string]: {
       run_on_startup: boolean,
       concurrent_jobs: integer,
       cleanup: {
         completed_after_seconds: integer,
         failed_after_seconds: integer,
       },
       retry_jobs_running_before_restart: boolean,
       jobs: {
         [job_name: string]: {
           preflight: {
             on_before_add: (job_to_add: object, db: object, queue_name: string) => boolean,
             okay_to_run: (payload: object, job: object) => boolean,
             requeue_delay_in_seconds: integer,
           },
           max_attempts: integer,
           on_max_attempts_exhausted: (job: object) => void,
           requeue_on_failure: boolean,
           run: (payload: object, job: object) => void,
         }
       }
    }
  }
}

Properties

  • queue_name object Required

    The name of the queue being defined as a string property, assigned an object as a value.

    • run_on_startup boolean

      Decides whether or not jobs for the queue being defined should run on server startup.

    • concurrent_jobs integer

      How many jobs to run concurrently (default: 1).

    • cleanup object

      Specifies cleanup options for jobs in the queue.

      • completed_after_seconds integer

        How many seconds to wait before removing completed jobs from the queue's collection/table in the database.

      • failed_after_seconds integer

        How many seconds to wait before removing failed jobs from the queue's collection/table in the database.

    • retry_jobs_running_before_restart boolean

      Decides whether or not jobs that had a status of running before the server restarted should be re-attempted.

    • jobs object Required

      Individual job definitions for the queue.

      • job_name object Required

        The name of a job to define for the queue as a string.

        • preflight object

          Preflight options for the job.

          • on_before_add function

            A function receiving the job_to_add as an object, the queue's database connection, and the queue_name. Returns a boolean to decide if the current job is safe to add to the database/queue.

          • okay_to_run function

            A function receiving the job 's payload as an object, the job instance as an object. Returns a boolean to decide if the current job is safe to run.

          • requeue_delay_in_seconds integer

            If the okay_to_run() function returns false, the number of seconds to wait before re-attempting the job.

        • max_attempts integer

          If defined, the max number of attempts for the job before permanently deleting it from the queue.

        • on_max_attempts_exhausted function

          If the max_attempts limit is reached, a function to call to perform work related to the job's failure.

        • requeue_on_failure boolean

          Decide whether or not Joystick should automatically requeue the job when its job.failed() callback is fired.

        • run function Required

          The main function to call for the job at run time. Receives the payload passed when the job was queued as an object and the job instance as an object.