Skip to content

Errors thrown in Readable.map can get swallowed by AbortError #62089

@hs-kgoriunov

Description

@hs-kgoriunov

Version

v25.7.0

Platform

Darwin MacBook.local 24.6.0 Darwin Kernel Version 24.6.0: Mon Jan 19 22:00:10 PST 2026; root:xnu-11417.140.69.708.3~1/RELEASE_X86_64 x86_64

Subsystem

No response

What steps will reproduce the bug?

import fs from 'node:fs';
import { Readable, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';

export async function main({ sourceStream, transformPushCount }) {
  await pipeline(
    sourceStream,

    new Transform({
      readableObjectMode: true,
      transform(chunk, encoding, callback) {
        for (let i = 0; i < transformPushCount; i++) {
          this.push({});
        }
        callback();
      },
    }),

    (readable) =>
      readable.map(async (obj) => {
        throw Error('Boom!');
      })
  );
}

if (import.meta.main) {
  try {
    await main({
      sourceStream: fs.createReadStream('/dev/urandom'),
      transformPushCount: 1,
    });
  } catch (e) {
    console.log(`sourceStream: file, transformPushCount: once, error: ${e}`);
  }
  try {
    await main({
      sourceStream: fs.createReadStream('/dev/urandom'),
      transformPushCount: 2,
    });
  } catch (e) {
    console.log(`sourceStream: file, transformPushCount: twice, error: ${e}`);
  }

  try {
    await main({
      sourceStream: Readable.from(['foo']),
      transformPushCount: 1,
    });
  } catch (e) {
    console.log(
      `sourceStream: readable from iterable, transformPushCount: once, error: ${e}`
    );
  }
  try {
    await main({
      sourceStream: Readable.from(['foo']),
      transformPushCount: 2,
    });
  } catch (e) {
    console.log(
      `sourceStream: readable from iterable, transformPushCount: twice, error: ${e}`
    );
  }

  try {
    await main({
      sourceStream: new Readable({
        read(size) {
          this.push('foo');
        },
      }),
      transformPushCount: 1,
    });
  } catch (e) {
    console.log(
      `sourceStream: readable implementation, transformPushCount: once, error: ${e}`
    );
  }
  try {
    await main({
      sourceStream: new Readable({
        read(size) {
          this.push('foo');
        },
      }),
      transformPushCount: 2,
    });
  } catch (e) {
    console.log(
      `sourceStream: readable implementation, transformPushCount: twice, error: ${e}`
    );
  }
}

How often does it reproduce? Is there a required condition?

It reproduces each time.

What is the expected behavior? Why is that the expected behavior?

I would expect the pipeline call to be rejected with Error: Boom! in all cases shown in the example code.

What do you see instead?

sourceStream: file, transformPushCount: once, error: Error: Boom!
sourceStream: file, transformPushCount: twice, error: AbortError: The operation was aborted
sourceStream: readable from iterable, transformPushCount: once, error: Error: Boom!
sourceStream: readable from iterable, transformPushCount: twice, error: Error: Boom!
sourceStream: readable implementation, transformPushCount: once, error: AbortError: The operation was aborted
sourceStream: readable implementation, transformPushCount: twice, error: AbortError: The operation was aborted

Additional information

The example came out of a larger code that reads a CSV file, parses it to objects using the csv-parse library, and then uses Radable.map to call a third-party API concurrently for every parsed record. The API credentials I've been using expired, causing the pipeline to reject with AbortError, though I'd expect it to reject with 401 Unauthorized.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions