In this next part we’ll break down how I set-up the Agenda task worker, how i defined the ‘upload ready’ task and the recursive uploader.
Logging a Task
Agenda is a background task scheduler similar to Ruby on Rails ‘Whenever’ plugin. Rather than hosting the tasks on a separate datastore like Redis we opted to keeping it light by allowing Agenda to create the table ‘agendaJobs’ within our primary MongoDB database. Then using the Agenda methods to push new tasks to it for processing.
In the previous post we did this via
const Agenda = require('agenda'); var agenda = new Agenda({db: {address: process.env.MONGODB_URI}}); var job = agenda.create('upload ready', { extract_location: record.extract_location, story_id: record.id });
In the first two lines we make a mongo connection, require Agenda and then afterwards we use the .create method with the name of the task with any attributes wrapped in {} it may need for processing. Here it’s the extract location and the record it will eventually need to update.
The Worker
So we’ve logged the task into our agendaJobs table for processing. How do we build the process that watch for new tasks and process them when they come in?
For this we’ll create a simple js script that we can run with pm2 on our server, or a Procfile on Heroku.
// worker.js 'use strict'; const Agenda = require('agenda'); const mongoose = require('mongoose'); var { uploadDirToS3 } = require('./lib/workers/s3uploader'); require('dotenv').config(); // setup mongoose var mongooseOptions = { reconnectInterval: 500, // Reconnect every 500ms reconnectTries: 30, // max number of retries keepAlive: true, // keep alive for long running connections poolSize: 10, // Maintain up to 10 socket connections bufferMaxEntries: 0, // If not connected, return errors immediately useNewUrlParser: true }; mongoose.Promise = global.Promise; mongoose.connect(process.env.MONGODB_URI || 'mongodb://localhost/et', mongooseOptions) // setup delayed job worker var agenda = new Agenda({db: {address: process.env.MONGODB_URI}}); // ----------------------- // define story uploader agenda.define('upload ready', {priority: 'highest', concurrency: 1}, function(job, done) { var data = job.attrs.data; uploadDirToS3(data.extract_location, data.story_id, job, done) }); // start job runner agenda.on('ready', function() { agenda.start(); console.log("Worker started") }); agenda.on('success:upload story', function(job) { console.log('Successfully uploaded story'); });
From the top we:
- Require Agenda.
- Require our s3uploader (which we’ll talk about next).
- Setup our connection to MongoDB via the library Mongoose.
- Give Agenda a connection to our mongo database server.
- Define the ‘upload ready’ task, set it’s priority, concurrency (how many of these jobs can be run at once), then using job.attrs.data we’ll gain access to those attributes we defined when we created the task and pass it to the uploadDirToS3 method.
- Next we’ll start the job runner.
- And finally on success we’ll notify the console.
In our package.json we can define the job with the ‘worker’ script.
"scripts": { "start": "nodemon --ignore media/ --ignore client/ --exec 'node server.js'", "worker": "node worker.js", "test": "jest" },
‘upload ready’ Script
So far we’ve setup built the client and server side to get the zip file there. Unzipped it and setup a task worker to operate on our uploads when they get logged but how are we going to handle the actual uploads.
Here we need to build a method that once given a directory can go in there, find all the files inside, then walk from directory to directory pushing more content up to s3.
Now we can do this in two methods.
Depth First, where we go down each directory as far as we can go pushing content to s3 before moving to the next.
Breadth First, where we try to stay as close to the top working one row at a time in unison until we get to the bottom of the tree.