Bayan Bennett

Queue FI-FO Fun

I see the code of a programmer
Be it good; or be it bad
Adding a queue will make me glad

While implementing optimizations, software developers may encounter performance issues that extend beyond a particular function or module. A potential problem in architecture, where two modules are tightly coupled, but have vastly different or unpredictable execution speeds.

Code doesn't have to be a linear set of operations, in fact, your operating system would be running much slower if it were not for an uncomplicated design pattern: queues.

What? Why?

Imagine a lineup of people outside of a popular donut shop. Each of them eagerly awaiting hot bean water and a toroidal pastry. This is a queue. You have seen this.

The customers can execute their purpose quickly, the speed of payment is much faster than the speed at which the food can be prepared. If we didn't have queues, as soon as someone stepped into the store to make their request, the universe would have to awkwardly wait for them to finish the transaction. The consequences of not having queues are untenable in day-to-day life. In software it can also have similar consequences.

Now, imagine a function that sends a log to a server and waits for the log to be sent before completing. That function would also be responsible for handling errors in the server request. In this example, at best, the purpose of the function is bloated by adding logging and error handling. In the worst case, the function performs poorly by having to wait for logs to be sent.

If a queue were implemented, it would extract the responsibility of logging and delivery. It could also accept logs from any function, prioritise error logs, and handle cases where the remote logging server is unreachable.

Creating a Queue

Note: The code below is in TypeScript, but a JavaScript version of the final code is available at the bottom.

There are many ways of making a queue, the code below is just one implementation that lays a conceptual foundation for what a queue is and how to use it.

Let's start with the three stateful elements of our queue.

  • The queue itself
  • The callback that will be called for every item in the queue
  • A flag that says whether the queue is currently being processed
class Queue<T> {
  private running: boolean = false;
  private queue: T[] = [];

  constructor(private callback: (item: T) => Promise<void>) {}
}

Processing the Queue

There are two ways that a queue is usually processed, via a loop or recursion. The example below uses a recursive function that:

  1. Checks that the queue is not empty
  2. Retrieves the first item in the queue and removes it from the queue
  3. Calls the callback with the item
  4. Returns a call to itself
private async processNext(): Promise<void> {
  if (this.queue.length === 0) return;
  const item = this.queue.shift() as T;
  await this.callback(item);
  return this.processNext();
}

Running the Queue

If the queue is being processed, don't start another processing instance. If the queue is not being processed, set the running flag to true and start processing the queue. When the processing has completed, set running to false.

private async run() {
  if (this.running) return;
  this.running = true;
  await this.processNext();
  this.running = false;
}

Pushing to the Queue

No surprises here, when the push method on our queue is called, push the item onto the internal queue and then call run.

push(item: T) {
  this.queue.push(item);
  this.run();
}

All Put Together

class Queue<T> {
  private running: boolean = false;
  private queue: T[] = [];

  constructor(private callback: (item: T) => Promise<void>) {}

  private async processNext(): Promise<void> {
    if (this.queue.length === 0) return;
    const item = this.queue.shift() as T;
    await this.callback(item);
    return this.processNext();
  }

  private async run() {
    if (this.running) return;
    this.running = true;
    await this.processNext();
    this.running = false;
  }

  push(item: T) {
    this.queue.push(item);
    this.run();
  }
}

In JavaScript

class Queue {
  constructor(callback) {
    this.callback = callback;
    this.running = false;
    this.queue = [];
  }
  async processNext() {
    if (this.queue.length === 0)
      return;
    const item = this.queue.shift();
    await this.callback(item);
    return this.processNext();
  }
  async run() {
    if (this.running)
      return;
    this.running = true;
    await this.processNext();
    this.running = false;
  }
  push(item) {
    this.queue.push(item);
    this.run();
  }
}

Extending the Functionality

  • Retries. Useful when dealing with unpredictability in a remote service. Items can be pushed back onto the queue to be attempted again. Adding a retry counter to the item makes it possible to limit the number of retries. Some logic should also be added to handle what happens when that limit is reached. This article might also be useful for this case: Retrying and Exponential Backoff with Promises.
  • Deferring callback registration. Useful when the callback is unknown until a certain point in the program. This way, a function can push to the queue and then move on. Once the callback is registered, the queue can be processed.
  • Limiting queue size. Useful for preventing the queue from taking up too much memory.
  • Task prioritization. Useful when there are certain tasks that need to be prioritized first. This can be accomplished by having multiple queues or by scoring and inserting items into a sorted queue.

TL;DR

A queue is a tool that can be used to decouple a module that runs quickly from one that runs slowly.

© 2021 Bayan Bennett