Cómo configurar una cola de trabajos en Node.js usando Agenda y MongoDB

Cómo configurar una cola de trabajos usando Agenda y MongoDB y cómo enviar trabajos a esa cola a través de un extremo API administrado por Express.

Primeros pasos

Para este tutorial, vamos a utilizar CheatCode Node.js Boilerplate como punto de partida para nuestro trabajo. Primero, clonemos una copia:

Terminal

git clone https://github.com/cheatcode/nodejs-server-boilerplate

A continuación, instale las dependencias repetitivas:

Terminal

cd nodejs-server-boilerplate && npm install

Después de eso, instale la dependencia que usaremos para crear nuestro servidor de trabajo agenda :

Terminal

npm i agenda

Con todas las dependencias instaladas, inicie el servidor de desarrollo:

Terminal

npm run dev

Con eso, estamos listos para comenzar.

Agregar un punto final de API para definir trabajos

Para comenzar, debemos agregar un punto final de API a través del servidor Express.js existente en el modelo estándar. Esto nos permitirá enviar trabajos a nuestro servidor de forma remota.

/api/jobs/index.js

export default (app) => {
  // We'll define our API endpoint via Express here.
};

En aras de la organización, primero vamos a crear un archivo separado para todas nuestras rutas relacionadas con trabajos (técnicamente solo tendremos una, pero esto mantendrá nuestro código flexible en caso de que desee expandirlo más adelante) .

Siguiendo el patrón existente en el repetitivo, aquí estamos definiendo una función y convirtiéndola en la exportación predeterminada de nuestro archivo. Esta función espera un Express app existente instancia que se pasará como su único argumento. Antes de implementar nuestra ruta, veamos cómo el app existente está configurado y cómo se debe llamar a esta función.

/index.js

import express from "express";
import startup from "./lib/startup";
import api from "./api/index";
import jobs from "./api/jobs";
import middleware from "./middleware/index";
import logger from "./lib/logger";

startup()
  .then(() => {
    const app = express();
    const port = process.env.PORT || 5001;

    middleware(app);
    api(app);
    jobs(app);

    app.listen(port, () => { ... });

    process.on("message", (message) => { ... });
  })
  .catch((error) => { ... });

Aquí, hemos abierto el index.js principal archivo para el repetitivo. Aquí es donde configuramos nuestro servidor Express y "configuramos" nuestra aplicación. La parte a la que queremos prestar atención aquí está justo en el medio, donde llamamos a la función que acabamos de exportar desde el otro archivo e importamos a este archivo como jobs . Justo encima de la llamada a esta función, estamos creando nuestro Express app instancia.

Al llamar a nuestra función, pasamos ese app instancia para que podamos usarla para agregar la ruta donde aceptaremos nuevos trabajos para nuestra cola.

/api/jobs/index.js

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

De vuelta en el archivo donde definimos nuestra función, ahora, usando el app instancia que hemos pasado, estamos agregando una ruta a nuestro servidor Express en /jobs . Esto servirá como punto final de API donde recibiremos solicitudes para programar nuevos trabajos (cuando se ejecute localmente, se podrá acceder a esto en http://localhost:5001/jobs ).

Dentro de la devolución de llamada de nuestra ruta, hemos agregado una respuesta para las solicitudes a la ruta a través del res.send() función. Cumplimos con ese mensaje con el que estamos respondiendo ahora y configuremos la cola a la que agregaremos nuestros trabajos.

Configuración de una cola de trabajo con Agenda

En aras de la simplicidad, en el mismo archivo, importemos el agenda dependencia que instalamos al comienzo del tutorial y crear la cola para nuestros trabajos:

/api/jobs/index.js

import Agenda from "agenda";
import settings from "../../lib/settings";

const jobQueue = new Agenda({
  db: {
    address: settings.databases.mongodb.uri,
    collection: "jobs",
  },
});

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

Arriba en la parte superior, importamos Agenda del agenda paquete que instalamos anteriormente (usamos un A mayúscula para el valor importado porque esperamos que sea un constructor de clase de JavaScript; usar una letra mayúscula es una práctica común para estos en el lenguaje).

También importamos el settings para el repetitivo. Esta es una función que está integrada en el repetitivo y nos permite almacenar datos de configuración para nuestra aplicación. Dentro de ese /lib/settings archivo, tenemos un código que intentará cargar un archivo de configuración que coincida con el nombre del entorno actual. En este momento, estamos ejecutando en el development entorno por lo que intenta cargar un settings-development.json archivo (desde la raíz de la aplicación). Si estuviéramos en un production entorno, intentaría cargar settings-production.json desde la raíz de la aplicación.

En desarrollo, un settings-development.json El archivo ya se nos ha proporcionado. Además, para nuestro próximo paso, también incluye la URL donde se ejecuta nuestra base de datos MongoDB. Muy rápido, si abrimos ese archivo, podemos ver la estructura:

/configuración-desarrollo.json

{
  ...
  "databases": {
    "mongodb": {
      "uri": "mongodb://127.0.0.1/app"
    }
  },
  ...
}

En desarrollo, solo apuntamos a la copia de MongoDB iniciada en localhost (aquí, indicada como 127.0.0.1 , la versión de la dirección IP de localhost ) automáticamente para nosotros cuando iniciamos el repetitivo.

/api/jobs/index.js

import Agenda from "agenda";
import settings from "../../lib/settings";

const jobQueue = new Agenda({
  db: {
    address: settings.databases.mongodb.uri,
    collection: "jobs",
  },
});

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

Mirando hacia atrás en nuestro código, con Agenda importado, creamos una nueva instancia con new Agenda() pasando en la configuración de nuestra cola. Para agenda para que funcione, debemos decirle en qué base de datos MongoDB almacenará nuestros trabajos y, opcionalmente, el nombre de la colección en esa base de datos donde almacenará nuestros trabajos (cada trabajo recibe un objeto con información sobre cuándo se supone que debe ejecutarse, cualquier dato asociado con él, etc.).

Aquí, pasamos el settings.databases.mongodb.uri valor que acabamos de ver en /settings-development.json como el db.address valor y establezca el collection nombre a jobs (Puedes cambiar esto a lo que quieras). Es importante tener en cuenta que almacenamos el resultado de llamar a new Agenda() en una variable jobQueue . Esto ahora contiene la instancia de cola que usaremos para agregar y administrar trabajos.

Finalmente, justo debajo de nuestra definición de const jobQueue nos aseguramos de llamar a jobQueue.start() . Esto asegura que Agenda realmente procese los trabajos que le entregamos. Aquí, simplemente lo iniciamos tan pronto como el archivo en el que estamos trabajando se carga en el servidor (es decir, en el inicio del servidor). En su propia aplicación, es posible que desee iniciar esto de forma más condicional.

A continuación, debemos configurar las funciones del controlador para nuestros trabajos. Vamos a definir dos funciones:una para demostrar la ejecución de trabajos inmediatamente después de que se hayan agregado a la cola y otra para demostrar la ejecución de trabajos después de un retraso.

/api/jobs/index.js

import Agenda from "agenda";
import dayjs from "dayjs";
import settings from "../../lib/settings";

const jobQueue = new Agenda({
  db: {
    address: settings.databases.mongodb.uri,
    collection: "jobs",
  },
});

jobQueue.define("instantJob", async (job) => {
  const data = job?.attrs?.data;
  console.log(
    "This job is running as soon as it was received. This is the data that was sent:"
  );
  console.log(data);
});

jobQueue.define("delayedJob", async (job) => {
  const data = job?.attrs?.data;
  console.log(
    "This job is running after a 5 second delay. This is the data that was sent:"
  );
  console.log(data);
});

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

Entre nuestro jobQueue definición y la llamada a jobQueue.start() , hemos agregado dos llamadas a jobQueue.define() . Esta es la función que usamos para decirle a Agenda qué hacer cuando un trabajo de un tipo determinado es el próximo en ejecutarse en la cola. Aquí, definimos dos tipos de trabajos instantJob y delayedJob (pasamos estos nombres como primer argumento a jobQueue.define() ).

Dentro de la función de devolución de llamada para cada tipo de trabajo, extraemos los datos que esperamos que se pasen al trabajo (en nuestro ejemplo, solo serán datos ficticios, pero para su propia aplicación esto proporcionará un contexto adicional cuando ejecute su trabajo:un ID de usuario, algunos datos para almacenar, etc.) del job.attrs.data valor donde job se nos pasa a través de Agenda y contiene un objeto que describe el trabajo actual que estamos tratando de ejecutar. Los datos personalizados que pasamos se almacenan en este objeto en su attrs.data valor.

Con esos datos, a continuación, en ambos trabajos, salimos de un mensaje que nos dice qué tipo de trabajo estamos ejecutando, seguido de un registro del data pasamos a lo largo. En su propia aplicación, aquí es donde ejecutaría el código para su trabajo.

En este momento, esto puede parecer confuso:hemos definido dos tipos de trabajos que son casi idénticos. A continuación, aprenderemos cómo aceptar trabajos a través de nuestro punto final de API y cómo diferenciaremos entre los dos tipos que hemos definido anteriormente.

Programación de trabajos a través del extremo de la API

Para facilitar la comprensión, agregaremos todo el código restante ahora y lo recorreremos paso a paso.

/api/jobs/index.js

import Agenda from "agenda";
import dayjs from "dayjs";
import settings from "../../lib/settings";

const jobQueue = new Agenda({ ... });

jobQueue.define("instantJob", async (job) => { ... });

jobQueue.define("delayedJob", async (job) => { ... });

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    const jobType = req?.query?.jobType;
    const allowedJobs = Object.keys(jobQueue._definitions);

    if (!jobType) {
      return res.send("Must pass a jobType in the query params.");
    }

    if (!allowedJobs.includes(jobType)) {
      return res.send(
        `${jobType} is not supported. Must pass one of ${allowedJobs.join(
          ", or "
        )} as jobType in the query params.`
      );
    }

    if (jobType === "instantJob") {
      jobQueue.now(req?.query?.jobType, req.body);
    }

    if (jobType === "delayedJob") {
      jobQueue.schedule(
        dayjs().add(5, "seconds").format(),
        req?.query?.jobType,
        req.body
      );
    }

    res.send("Job added to queue!");
  });
};

Centrándonos en la función de devolución de llamada para nuestra ruta, el código que hemos agregado aquí está resolviendo tres problemas:averiguar qué trabajo ejecutar (instantJob o delayedJob ), validando si ese trabajo es o no uno que hemos definido, y luego, si lo es, agregando ese trabajo a la cola.

Para identificar cuál trabajo para ejecutar, miramos el query objeto del req entrante objeto pasado a nuestra devolución de llamada de ruta. Toma, query representa los parámetros de consulta incluidos en la URL al llamar a la ruta, como ?jobType=instantJob . La idea aquí es que cuando ejecutemos nuestro trabajo, usaremos el parámetro de consulta jobType para decirnos hacia dónde se dirige nuestro trabajo.

Justo debajo de esto, obtenemos la lista de trabajos permitidos usando el método de JavaScript incorporado Object.keys() para recuperar una matriz de los trabajos que hemos definido nuestro jobQueue (estos se almacenan en el _definitions objeto en nuestra instancia de Agenda).

A continuación, primero, nos aseguramos de que un jobType se ha pasado en nuestros parámetros de consulta. Si no es así, respondemos a la solicitud con un mensaje de advertencia.

Si hacemos tener un jobType , a continuación, validamos que sea uno de nuestros allowedJobs . Suponiendo que el valor almacenado en jobType está en esa matriz, pasamos a agregar el trabajo a la cola. Si no es así, respondemos con otra advertencia, informando al usuario que pasó jobType no es válido y proporcione una lista de los posibles tipos de trabajos disponibles (¡detalles!).

Pasando a poner en cola nuestros trabajos, recuerde que nuestro objetivo es a.) agregar nuestro trabajo a la cola para que se ejecute inmediatamente, o b.) programar el trabajo para que se ejecute en el futuro. Aquí, para trabajos del tipo instantJob , llamamos al .now() método en jobQueue , pasando el tipo de trabajo que queremos ejecutar, desde nuestros parámetros de consulta, y los datos (lo que extraemos de job.attrs.data dentro de la devolución de llamada del trabajo) que queremos pasar, que en este caso es el body del req objeto (pista:asumimos que nuestros trabajos se pasan a nuestra ruta como HTTP POST solicitud).

A continuación, para nuestro delayedJob tipo, llamamos a jobQueue.schedule() , pasando una fecha en la que queremos que se ejecute nuestro trabajo junto con nuestro jobType y req.body , tal como lo hicimos para instantJob . Para generar una fecha, aquí estamos usando el dayjs biblioteca que está incluida en el repetitivo que estamos usando. La línea dayjs().add(5, "seconds").format() here dice "obtenga la fecha y la hora actuales, agréguele cinco segundos y luego formatéela como una cadena ISO-8601 ("cadena iso" para abreviar, un formato de fecha estandarizado como 2021-07-29T23:00:00Z )".

¡Eso es todo! Si probamos nuestros diferentes trabajos, veremos los registros que agregamos anteriormente en la consola de nuestro servidor:

Terminando

En este tutorial, aprendimos cómo implementar una cola de trabajos utilizando la biblioteca Agenda junto con MongoDB. Aprendimos cómo configurar una ruta a través de Express para recibir trabajos, cómo configurar una cola con Agenda, cómo definir tipos de trabajo en esa cola y, finalmente, cómo agregar trabajos recibidos a través de nuestra ruta API a esa cola.