Skip to content

Support back pressure #51

@matthieusieben

Description

@matthieusieben

Support plan

  • is this issue currently blocking your project? (yes/no): yes
  • is this issue affecting a production system? (yes/no): no

Context

  • node version: 16.20.0
  • module version with issue: 5.1.0
  • last module version without issue: 5.1.0
  • environment (e.g. node, browser, native): node
  • used with (e.g. hapi application, another framework, standalone, ...): hapi application (route.payload.parse = "gunzip")
  • any other relevant information:

What are you trying to achieve or the steps to reproduce?

const { promisify } = require("node:util")
const { pipeline } = require("node:stream/promises")
const Pez = require('@hapi/pez')
const Content = require('@hapi/content')

const sleep = promisify(setTimeout)

server.route({
  method: 'POST',
  path: `/upload`,
  options: {
    payload: {
      output: 'stream',
      parse: 'gunzip',
      maxBytes: 10 * 1024 * 1024 * 1024, // 10GB
    },
  },
  handler: async (r, h) => {
    const contentType = Content.type(r.headers['content-type'])
    if (contentType.mime === 'multipart/form-data') {
      const promises = []

      const dispenser = new Pez.Dispenser({
        boundary: contentType.boundary,
      })

      dispenser.on('part', (stream) => {
        console.log('part', stream.name)
        try {
          promises.push(processPartSlowly(stream))
        } catch (err) {
          promises.push(Promise.reject(err))
        }
      })

      await pipeline(r.payload, dispenser)
      
      console.log('payload fully consumed')

      const results = await Promise.allSettled(promises)
      
      console.log('all streams were consumed')

      console.log('results', results)

    }
  }
})

async function processPartSlowly(stream) {
  for await (const chunk of stream) {
    sleep(chunk.length) // Will sleep for 1 second per 1000 bytes
  }
}

What was the result you got?

The payload fully consumed log message comes in quite fast, no matter what the consumer speed is.

Back pressure should be applied to the input stream. In the current situation if consumption of the stream is slow, it will result in a high memory usage. This situation could even arise when Hapi's route.options.payload.output option is set to file and the file system is slow.

What result did you expect?

The inbound HTTP request stream (r.payload) should stop being consumed if the highWaterMarck of the stream is reached. The return value of this line should not be ignored.

Metadata

Metadata

Assignees

No one assigned

    Labels

    supportQuestions, discussions, and general support

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions