Defining a Queue
To define a queue for your app, define your queue via the queues
object on the options
object passed to joystick.app()
:
/index.server.js
import joystick from '@joystick.js/node';
joystick.app({
queues: {
example: {
run_on_startup: true,
concurrent_jobs: 10,
jobs: {
example_job: {
run: (payload = {}, job = {}) => {
// Handle work for the job here...
// Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
}
},
},
},
},
routes: { ... }
});
In Joystick, a queue is a constantly running/ticking process (the queue tick is every 300ms
) which pulls database entries representing jobs to run. Depending on the rules you define for a queue, Joystick will attempt to run a job using a FIFO (first in, first out) approach. So, if we add one job at 12:01 PM
and another at 12:03 PM
, we'd expect the 12:01 PM
job to run first, then 12:03 PM
, and so on.
Above, we've defined a simple queue called example
. To define it, on the queues
object we pass to our joystick.app()
options, we add a key/value pair where the key
is the name of our queue and the value
is an object defining the queues behavior and supported jobs
.
For our options, we've specified that we want Joystick to automatically start this queue on startup via run_on_startup: true
, run a maximum of 10
jobs concurrently (at the same time), and we've defined a single job example_job
.
Running jobs
Ultimately, a "job" is just an arbitrary function that we call at a specified time, passing in the specified payload
for the job (learn more here) and the job
instance. Via the job
object passed as the second argument to our job's run
method, we receive a few different methods that manage the "state" of the job:
job.completed()
is a function returning a JavaScript Promise that allows us to signal back to the queue that whatever work we intended to complete is completed.job.failed()
is a function returning a JavaScript Promise that allows us to signal back to the queue that whatever work we intended to complete has failed.job.requeue()
is a function returning a JavaScript Promise that allows us to signal back to the queue that we'd like to try the job again (presumably because it's current run has failed or some prerequisite was not met).job.delete()
is a function returning a JavaScript Promise that allows us to signal back to the queue that we'd like to permanently delete the job (i.e., remove it from the queue).
One of these methods must be called
In order for a job to not get "stuck" in the queue, one of these methods must be called to manage the state of the job. Even if the code you execute is successful, your queue has no way of knowing that unless you tell it.
Cleaning up jobs
By default, after a job is either marked as .completed()
or .failed()
that job's entry in the database is not removed. This can be helpful for auditing purposes and is assumed a sensible default. If you do not need to keep old jobs in the database after they've either completed or failed, it's recommended to utilize the cleanup
option on your queue definition to automate this process:
/index.server.js
import joystick from '@joystick.js/node';
joystick.app({
queues: {
example: {
run_on_startup: true,
concurrent_jobs: 10,
cleanup: {
completed_after_seconds: 60,
failed_after_seconds: 60,
},
jobs: {
example_job: {
run: (payload = {}, job = {}) => {
// Handle work for the job here...
// Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
}
},
},
},
},
routes: { ... }
});
Above, we've added an object cleanup
to our queue definition with two child properties: completed_after_seconds
and failed_after_seconds
. As these names imply, this tells Joystick to automatically clean up or delete jobs in the database 60
seconds after they've either completed or failed. Setting these values ensures that your database doesn't become overwhelmed with old data, so using this option is highly recommended unless auditing data is mandatory (e.g., for regulatory compliance).
Retrying jobs after server restart
Because your queue is running on a constant tick/interval, for the sake of consistency, it's important to avoid jobs accidentally being dropped due to a server restart. A job can be dropped if your server restarts while it's status is running
(e.g., as part of a redeployment in a non-development environment). To ensure that a job run survives a restart, it's important to set the retry_jobs_running_before_restart
flag on your queue definition:
/index.server.js
import joystick from '@joystick.js/node';
joystick.app({
queues: {
example: {
run_on_startup: true,
concurrent_jobs: 10,
retry_jobs_running_before_restart: true,
jobs: {
example_job: {
run: (payload = {}, job = {}) => {
// Handle work for the job here...
// Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
}
},
},
},
},
routes: { ... }
});
When your server starts back up, Joystick will automatically look for jobs with a status of running
and set them back to pending
. Because of a queue's FIFO (first in, first out) nature, those jobs will be run ahead of any other jobs to ensure they're completed relative to their specified time.
Setting max job attempts
In some cases, a job's completion may be indeterminate based on various factors. By default, unless you instruct otherwise, Joystick will continue to attempt a job indefinitely. To put an upper bound on this, utilizing the max_attempts
option on your job definition is recommended:
/index.server.js
import joystick from '@joystick.js/node';
joystick.app({
queues: {
example: {
run_on_startup: true,
concurrent_jobs: 10,
jobs: {
example_job: {
max_attempts: 5,
on_max_attempts_exhausted: (job = {}) => {
// Optional: handle a max attempts exhausted event...
},
run: (payload = {}, job = {}) => {
// Handle work for the job here...
// Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
}
},
},
},
},
routes: { ... }
});
On a job you'd like to limit the maximum attempts for, add a max_attempts
flag set to an integer
with the maximum number of attempts to try (e.g., above we've set this to 5
). Additionally, if you'd like to be notified when the max attempts are exhausted (e.g., alerting administrators or performing some work related to the job), the on_max_attempts_exhausted()
hook function can be added. This function
is called immediately after the max limit is reached and receives the current job
instance as an argument.
Requeing jobs on failure
Depending on the nature of your app, in some cases, you may want to retry a job when it fails. To avoid messy code, Joystick offers a convenience setting for your job requeue_on_failure
that can be set to automatically add the job back into the queue for another attempt.
/index.server.js
import joystick from '@joystick.js/node';
joystick.app({
queues: {
example: {
run_on_startup: true,
concurrent_jobs: 10,
jobs: {
example_job: {
requeue_on_failure: true,
run: (payload = {}, job = {}) => {
// Handle work for the job here...
// Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
}
},
},
},
},
routes: { ... }
});
As soon as the job.failed()
method is called on any example_job
attempt, Joystick will automatically shift the job's status back to pending
and schedule it to run 10
seconds in the future.
Pre-flight options
To control the flow of jobs, an additional preflight
option can be added to job definitions to decide whether or not they can be added to the queue or run. This is a helpful safety mechanism for jobs that are run often, or, are tied to conditional data like a user's subscription status.
/index.server.js
import joystick from '@joystick.js/node';
joystick.app({
queues: {
example: {
run_on_startup: true,
concurrent_jobs: 10,
jobs: {
example_job: {
preflight: {
on_before_add: (job_to_add = {}, db = {}, queue_name = '') => {
// Optional: decide if the job can be added to the database/queue.
return true;
},
okay_to_run: (payload = {}, job = {}) => {
// Optional: decide if the job can be run right now.
return true;
},
requeue_delay_in_seconds: 60,
},
run: (payload = {}, job = {}) => {
// Handle work for the job here...
// Call one of job.completed(), job.failed(), job.requeue(), or job.delete(), after work is finished.
}
},
},
},
},
routes: { ... }
});
There are two core preflight
options that can be set, both functions: on_before_add()
and okay_to_run()
. on_before_add()
is intended to be a stop-gap for adding jobs to the queue/database (e.g., preventing a job related to a premium subscription feature being added for a user without an active subscription) and okay_to_run()
is intended as a check to ensure the job can run now or should be saved for later (e.g., if the job is to hand off some work to an encoder, ensure that the encoder isn't already overwhelmed before running the job).
Both functions are expected to return a boolean
value. For the second function, okay_to_run()
, if the job is okay to run in the future (but not right now), it can be automatically requeued in the future using the preflight.requeue_delay_in_seconds
option to reschedule the job for a future attempt.
API Reference
Queue Definition API
Queue Definition API
{
queues: {
[queue_name: string]: {
run_on_startup: boolean,
concurrent_jobs: integer,
cleanup: {
completed_after_seconds: integer,
failed_after_seconds: integer,
},
retry_jobs_running_before_restart: boolean,
jobs: {
[job_name: string]: {
preflight: {
on_before_add: (job_to_add: object, db: object, queue_name: string) => boolean,
okay_to_run: (payload: object, job: object) => boolean,
requeue_delay_in_seconds: integer,
},
max_attempts: integer,
on_max_attempts_exhausted: (job: object) => void,
requeue_on_failure: boolean,
run: (payload: object, job: object) => void,
}
}
}
}
}
Properties
-
queue_name object Required
The name of the queue being defined as a
string
property, assigned anobject
as a value.-
run_on_startup boolean
Decides whether or not jobs for the queue being defined should run on server startup.
-
concurrent_jobs integer
How many jobs to run concurrently (default:
1
). -
cleanup object
Specifies cleanup options for jobs in the queue.
-
completed_after_seconds integer
How many seconds to wait before removing completed jobs from the queue's collection/table in the database.
-
failed_after_seconds integer
How many seconds to wait before removing failed jobs from the queue's collection/table in the database.
-
-
retry_jobs_running_before_restart boolean
Decides whether or not jobs that had a status of
running
before the server restarted should be re-attempted. -
jobs object Required
Individual job definitions for the queue.
-
job_name object Required
The name of a job to define for the queue as a
string
.-
preflight object
Preflight options for the job.
-
on_before_add function
A function receiving the
job_to_add
as an object, the queue'sdatabase
connection, and thequeue_name
. Returns a boolean to decide if the current job is safe to add to the database/queue. -
okay_to_run function
A function receiving the job 's
payload
as an object, thejob
instance as an object. Returns a boolean to decide if the current job is safe to run. -
requeue_delay_in_seconds integer
If the
okay_to_run()
function returns false, the number of seconds to wait before re-attempting the job.
-
-
max_attempts integer
If defined, the max number of attempts for the job before permanently deleting it from the queue.
-
on_max_attempts_exhausted function
If the
max_attempts
limit is reached, a function to call to perform work related to the job's failure. -
requeue_on_failure boolean
Decide whether or not Joystick should automatically requeue the job when its
job.failed()
callback is fired. -
run function Required
The main function to call for the job at run time. Receives the
payload
passed when the job was queued as an object and thejob
instance as an object.
-
-
-