-
-
Notifications
You must be signed in to change notification settings - Fork 34.9k
Open
Description
Version
v25.7.0
Platform
Darwin MacBook.local 24.6.0 Darwin Kernel Version 24.6.0: Mon Jan 19 22:00:10 PST 2026; root:xnu-11417.140.69.708.3~1/RELEASE_X86_64 x86_64
Subsystem
No response
What steps will reproduce the bug?
import fs from 'node:fs';
import { Readable, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
export async function main({ sourceStream, transformPushCount }) {
await pipeline(
sourceStream,
new Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
for (let i = 0; i < transformPushCount; i++) {
this.push({});
}
callback();
},
}),
(readable) =>
readable.map(async (obj) => {
throw Error('Boom!');
})
);
}
if (import.meta.main) {
try {
await main({
sourceStream: fs.createReadStream('/dev/urandom'),
transformPushCount: 1,
});
} catch (e) {
console.log(`sourceStream: file, transformPushCount: once, error: ${e}`);
}
try {
await main({
sourceStream: fs.createReadStream('/dev/urandom'),
transformPushCount: 2,
});
} catch (e) {
console.log(`sourceStream: file, transformPushCount: twice, error: ${e}`);
}
try {
await main({
sourceStream: Readable.from(['foo']),
transformPushCount: 1,
});
} catch (e) {
console.log(
`sourceStream: readable from iterable, transformPushCount: once, error: ${e}`
);
}
try {
await main({
sourceStream: Readable.from(['foo']),
transformPushCount: 2,
});
} catch (e) {
console.log(
`sourceStream: readable from iterable, transformPushCount: twice, error: ${e}`
);
}
try {
await main({
sourceStream: new Readable({
read(size) {
this.push('foo');
},
}),
transformPushCount: 1,
});
} catch (e) {
console.log(
`sourceStream: readable implementation, transformPushCount: once, error: ${e}`
);
}
try {
await main({
sourceStream: new Readable({
read(size) {
this.push('foo');
},
}),
transformPushCount: 2,
});
} catch (e) {
console.log(
`sourceStream: readable implementation, transformPushCount: twice, error: ${e}`
);
}
}How often does it reproduce? Is there a required condition?
It reproduces each time.
What is the expected behavior? Why is that the expected behavior?
I would expect the pipeline call to be rejected with Error: Boom! in all cases shown in the example code.
What do you see instead?
sourceStream: file, transformPushCount: once, error: Error: Boom!
sourceStream: file, transformPushCount: twice, error: AbortError: The operation was aborted
sourceStream: readable from iterable, transformPushCount: once, error: Error: Boom!
sourceStream: readable from iterable, transformPushCount: twice, error: Error: Boom!
sourceStream: readable implementation, transformPushCount: once, error: AbortError: The operation was aborted
sourceStream: readable implementation, transformPushCount: twice, error: AbortError: The operation was aborted
Additional information
The example came out of a larger code that reads a CSV file, parses it to objects using the csv-parse library, and then uses Radable.map to call a third-party API concurrently for every parsed record. The API credentials I've been using expired, causing the pipeline to reject with AbortError, though I'd expect it to reject with 401 Unauthorized.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels