Skip to content

Consuming Streams

When working with streams, it’s essential to understand how to consume the data they produce. In this guide, we’ll walk through several common methods for consuming streams.

To gather all the elements from a stream into a single Chunk, you can use the Stream.runCollect function.

1
import {
import Stream
Stream
,
import Effect
Effect
} from "effect"
2
3
const
const stream: Stream.Stream<number, never, never>
stream
=
import Stream
Stream
.
const make: <[number, number, number, number, number]>(as_0: number, as_1: number, as_2: number, as_3: number, as_4: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.

make
(1, 2, 3, 4, 5)
4
5
const
const collectedData: Effect.Effect<Chunk<number>, never, never>
collectedData
=
import Stream
Stream
.
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.

runCollect
(
const stream: Stream.Stream<number, never, never>
stream
)
6
7
import Effect
Effect
.
const runPromise: <Chunk<number>, never>(effect: Effect.Effect<Chunk<number>, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<Chunk<number>>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const collectedData: Effect.Effect<Chunk<number>, never, never>
collectedData
).
(method) Promise<Chunk<number>>.then<void, never>(onfulfilled?: ((value: Chunk<number>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
8
/*
9
Output:
10
{
11
_id: "Chunk",
12
values: [ 1, 2, 3, 4, 5 ]
13
}
14
*/

Another way to consume elements of a stream is by using Stream.runForEach. It takes a callback function that receives each element of the stream. Here’s an example:

1
import {
import Stream
Stream
,
import Effect
Effect
,
import Console
Console
} from "effect"
2
3
const
const effect: Effect.Effect<void, never, never>
effect
=
import Stream
Stream
.
const make: <[number, number, number]>(as_0: number, as_1: number, as_2: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.

make
(1, 2, 3).
(method) Pipeable.pipe<Stream.Stream<number, never, never>, Effect.Effect<void, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Effect.Effect<void, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
4
import Stream
Stream
.
const runForEach: <number, void, never, never>(f: (a: number) => Effect.Effect<void, never, never>) => <E, R>(self: Stream.Stream<number, E, R>) => Effect.Effect<void, E, R> (+1 overload)

Consumes all elements of the stream, passing them to the specified callback.

runForEach
((
(parameter) n: number
n
) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
(
(parameter) n: number
n
))
5
)
6
7
import Effect
Effect
.
const runPromise: <void, never>(effect: Effect.Effect<void, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<void>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const effect: Effect.Effect<void, never, never>
effect
).
(method) Promise<void>.then<void, never>(onfulfilled?: ((value: void) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) globalThis.Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
8
/*
9
Output:
10
1
11
2
12
3
13
undefined
14
*/

In this example, we use Stream.runForEach to log each element to the console.

The Stream.fold function is another way to consume a stream by performing a fold operation over the stream of values and returning an effect containing the result. Here are a couple of examples:

1
import {
import Stream
Stream
,
import Effect
Effect
} from "effect"
2
3
const
const foldedStream: Effect.Effect<number, never, never>
foldedStream
=
import Stream
Stream
.
const make: <[number, number, number, number, number]>(as_0: number, as_1: number, as_2: number, as_3: number, as_4: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.

make
(1, 2, 3, 4, 5).
(method) Pipeable.pipe<Stream.Stream<number, never, never>, Effect.Effect<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Effect.Effect<number, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
4
import Stream
Stream
.
const runFold: <number, number>(s: number, f: (s: number, a: number) => number) => <E, R>(self: Stream.Stream<number, E, R>) => Effect.Effect<number, E, Exclude<R, Scope>> (+1 overload)

Executes a pure fold over the stream of values - reduces all elements in the stream to a value of type `S`.

runFold
(0, (
(parameter) a: number
a
,
(parameter) b: number
b
) =>
(parameter) a: number
a
+
(parameter) b: number
b
)
5
)
6
7
import Effect
Effect
.
const runPromise: <number, never>(effect: Effect.Effect<number, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<number>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const foldedStream: Effect.Effect<number, never, never>
foldedStream
).
(method) Promise<number>.then<void, never>(onfulfilled?: ((value: number) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
8
// Output: 15
9
10
const
const foldedWhileStream: Effect.Effect<number, never, never>
foldedWhileStream
=
import Stream
Stream
.
const make: <[number, number, number, number, number]>(as_0: number, as_1: number, as_2: number, as_3: number, as_4: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.

make
(1, 2, 3, 4, 5).
(method) Pipeable.pipe<Stream.Stream<number, never, never>, Effect.Effect<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Effect.Effect<number, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
11
import Stream
Stream
.
const runFoldWhile: <number, number>(s: number, cont: Predicate<number>, f: (s: number, a: number) => number) => <E, R>(self: Stream.Stream<number, E, R>) => Effect.Effect<number, E, Exclude<...>> (+1 overload)

Reduces the elements in the stream to a value of type `S`. Stops the fold early when the condition is not fulfilled. Example:

runFoldWhile
(
12
0,
13
(
(parameter) n: number
n
) =>
(parameter) n: number
n
<= 3,
14
(
(parameter) a: number
a
,
(parameter) b: number
b
) =>
(parameter) a: number
a
+
(parameter) b: number
b
15
)
16
)
17
18
import Effect
Effect
.
const runPromise: <number, never>(effect: Effect.Effect<number, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<number>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const foldedWhileStream: Effect.Effect<number, never, never>
foldedWhileStream
).
(method) Promise<number>.then<void, never>(onfulfilled?: ((value: number) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
19
// Output: 6

In the first example (foldedStream), we use Stream.runFold to calculate the sum of all elements. In the second example (foldedWhileStream), we use Stream.runFoldWhile to calculate the sum but only until a certain condition is met.

To consume a stream using a Sink, you can pass the Sink to the Stream.run function. Here’s an example:

1
import {
import Stream
Stream
,
import Sink
Sink
,
import Effect
Effect
} from "effect"
2
3
const
const effect: Effect.Effect<number, never, never>
effect
=
import Stream
Stream
.
const make: <[number, number, number]>(as_0: number, as_1: number, as_2: number) => Stream.Stream<number, never, never>

Creates a stream from an sequence of values.

make
(1, 2, 3).
(method) Pipeable.pipe<Stream.Stream<number, never, never>, Effect.Effect<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Effect.Effect<number, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
import Stream
Stream
.
const run: <number, number, never, never>(sink: Sink.Sink<number, number, unknown, never, never>) => <E, R>(self: Stream.Stream<number, E, R>) => Effect.Effect<number, E, Exclude<R, Scope>> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

run
(
import Sink
Sink
.
const sum: Sink.Sink<number, number, never, never, never>

A sink that sums incoming numeric values.

sum
))
4
5
import Effect
Effect
.
const runPromise: <number, never>(effect: Effect.Effect<number, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<number>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const effect: Effect.Effect<number, never, never>
effect
).
(method) Promise<number>.then<void, never>(onfulfilled?: ((value: number) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
6
// Output: 6

In this example, we use a Sink to calculate the sum of the elements in the stream.