Nodejs - Memory Intensive Promise Operations

11/07/2021, Sun
Tags: #NodeJs

Backpressure for Promises

When dealing with promises, the processing of long-running tasks is the commonly encountered scenario. However, some long-running task might also be performing memory intensive operations. To deal with running multiple operations at once, it is important to place a limit on the number of promises allowed to run at the same time as to not exhaust the Nodejs JavaScript heap.

Most npm libraries out there designed for limiting the promises executions, will offer an option to cap off the amount of running promises.

However, this might not be the most reliable means of making sure that memory intensive tasks do not overtax Node because it is not certain how much memory compounding promise tasks might consume. To overcome this type of limitation, it is helpful to look at what Nodejs streams has to offer in terms of processing larger data sets in memory. Streams possess the ability to limit the amount data consume by implementing backpressure.

Backpressure applies to Nodejs streams, but it would be helpful if this was made available to promises. The answer to this is made possible with the npm module, streamie.

This library provides the ability to iterate over an array of items that gets process in return promises. Streamie is also capable of automatically handling backpressure to alleviate the burden of having to do so manually.

The following example is taken from the streamie source repository showing the simple use case:

const { source } = require('streamie');

const items = [
  {location: 'San Diego'},
  {location: 'Los Angeles'},
  {location: 'Denver'},
  {location: 'New York'},
  {location: 'St. Louis'},
  {location: 'Zurich'},
  {location: 'Barcelona'},
  {location: 'Buenos Aires'}
]

source(items, (item, { streamie }) => {
  console.log('Starting', item);
  return new Promise((resolve) => {
    setTimeout(() => resolve(), Math.random() * 1000);
  })
  .then(() => {
    console.log('Done with', item);
    if (item.location === 'Buenos Aires') streamie.complete('Final Value');
  });
})
.then((value) => console.log('Streamie complete.', value));

It is possible to configure the concurrency limit in Streamie by passing in a configuration object as the third argument to the source function:

source(items, (item, { streamie }) => {
  // ...
}, /* {concurrency: the_concurrency_limit } */)
.then((value) => console.log('Streamie complete.', value));