Leftovers
In this section, we’ll explore how to deal with elements that may be left unconsumed by sinks. Sinks can consume varying numbers of elements from their upstream, and we’ll learn how to collect or ignore any leftovers.
When a sink consumes elements from an upstream source, it may not use all of them. These unconsumed elements are referred to as “leftovers.” To collect these leftovers, we can use Sink.collectLeftover
. It returns a tuple containing the result of the previous sink operation and any leftover elements:
1import { import Stream
Stream, import Sink
Sink, import Effect
Effect } from "effect"2
3const const s1: Effect.Effect<[Chunk<number>, Chunk<number>], never, never>
s1 = import Stream
Stream.const make: <[number, number, number, number, number]>(as_0: number, as_1: number, as_2: number, as_3: number, as_4: number) => Stream.Stream<number, never, never>
Creates a stream from an sequence of values.
make(1, 2, 3, 4, 5).(method) Pipeable.pipe<Stream.Stream<number, never, never>, Effect.Effect<[Chunk<number>, Chunk<number>], never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Effect.Effect<...>): Effect.Effect<...> (+21 overloads)
pipe(4 import Stream
Stream.const run: <[Chunk<number>, Chunk<number>], number, never, never>(sink: Sink.Sink<[Chunk<number>, Chunk<number>], number, unknown, never, never>) => <E, R>(self: Stream.Stream<...>) => Effect.Effect<...> (+1 overload)
Runs the sink on the stream to produce either the sink's result or an error.
run(import Sink
Sink.const take: <number>(n: number) => Sink.Sink<Chunk<number>, number, number, never, never>
A sink that takes the specified number of values.
take<number>(3).(method) Pipeable.pipe<Sink.Sink<Chunk<number>, number, number, never, never>, Sink.Sink<[Chunk<number>, Chunk<number>], number, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<...>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe(import Sink
Sink.const collectLeftover: <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<[A, Chunk<L>], In, never, E, R>
Collects the leftovers from the stream when the sink succeeds and returns
them as part of the sink's result.
collectLeftover))5)6
7import Effect
Effect.const runPromise: <[Chunk<number>, Chunk<number>], never>(effect: Effect.Effect<[Chunk<number>, Chunk<number>], never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<...>
Executes an effect and returns a `Promise` that resolves with the result.
Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises.
If the effect fails, the returned Promise will be rejected with the error.
runPromise(const s1: Effect.Effect<[Chunk<number>, Chunk<number>], never, never>
s1).(method) Promise<[Chunk<number>, Chunk<number>]>.then<void, never>(onfulfilled?: ((value: [Chunk<number>, Chunk<number>]) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>
Attaches callbacks for the resolution and/or rejection of the Promise.
then(namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log)8/*9Output:10[11 {12 _id: "Chunk",13 values: [ 1, 2, 3 ]14 }, {15 _id: "Chunk",16 values: [ 4, 5 ]17 }18]19*/20
21const const s2: Effect.Effect<[Option<number>, Chunk<number>], never, never>
s2 = import Stream
Stream.const make: <[number, number, number, number, number]>(as_0: number, as_1: number, as_2: number, as_3: number, as_4: number) => Stream.Stream<number, never, never>
Creates a stream from an sequence of values.
make(1, 2, 3, 4, 5).(method) Pipeable.pipe<Stream.Stream<number, never, never>, Effect.Effect<[Option<number>, Chunk<number>], never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Effect.Effect<...>): Effect.Effect<...> (+21 overloads)
pipe(22 import Stream
Stream.const run: <[Option<number>, Chunk<number>], number, never, never>(sink: Sink.Sink<[Option<number>, Chunk<number>], number, unknown, never, never>) => <E, R>(self: Stream.Stream<...>) => Effect.Effect<...> (+1 overload)
Runs the sink on the stream to produce either the sink's result or an error.
run(import Sink
Sink.const head: <number>() => Sink.Sink<Option<number>, number, number, never, never>
Creates a sink containing the first value.
head<number>().(method) Pipeable.pipe<Sink.Sink<Option<number>, number, number, never, never>, Sink.Sink<[Option<number>, Chunk<number>], number, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<...>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe(import Sink
Sink.const collectLeftover: <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<[A, Chunk<L>], In, never, E, R>
Collects the leftovers from the stream when the sink succeeds and returns
them as part of the sink's result.
collectLeftover))23)24
25import Effect
Effect.const runPromise: <[Option<number>, Chunk<number>], never>(effect: Effect.Effect<[Option<number>, Chunk<number>], never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<...>
Executes an effect and returns a `Promise` that resolves with the result.
Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises.
If the effect fails, the returned Promise will be rejected with the error.
runPromise(const s2: Effect.Effect<[Option<number>, Chunk<number>], never, never>
s2).(method) Promise<[Option<number>, Chunk<number>]>.then<void, never>(onfulfilled?: ((value: [Option<number>, Chunk<number>]) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>
Attaches callbacks for the resolution and/or rejection of the Promise.
then(namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log)26/*27Output:28[29 {30 _id: "Option",31 _tag: "Some",32 value: 133 }, {34 _id: "Chunk",35 values: [ 2, 3, 4, 5 ]36 }37]38*/
When leftover elements are not needed, they can be ignored using Sink.ignoreLeftover
:
1import { import Stream
Stream, import Sink
Sink, import Effect
Effect } from "effect"2
3const const s1: Effect.Effect<[Chunk<number>, Chunk<never>], never, never>
s1 = import Stream
Stream.const make: <[number, number, number, number, number]>(as_0: number, as_1: number, as_2: number, as_3: number, as_4: number) => Stream.Stream<number, never, never>
Creates a stream from an sequence of values.
make(1, 2, 3, 4, 5).(method) Pipeable.pipe<Stream.Stream<number, never, never>, Effect.Effect<[Chunk<number>, Chunk<never>], never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Effect.Effect<...>): Effect.Effect<...> (+21 overloads)
pipe(4 import Stream
Stream.const run: <[Chunk<number>, Chunk<never>], number, never, never>(sink: Sink.Sink<[Chunk<number>, Chunk<never>], number, unknown, never, never>) => <E, R>(self: Stream.Stream<...>) => Effect.Effect<...> (+1 overload)
Runs the sink on the stream to produce either the sink's result or an error.
run(5 import Sink
Sink.const take: <number>(n: number) => Sink.Sink<Chunk<number>, number, number, never, never>
A sink that takes the specified number of values.
take<number>(3).(method) Pipeable.pipe<Sink.Sink<Chunk<number>, number, number, never, never>, Sink.Sink<Chunk<number>, number, never, never, never>, Sink.Sink<[Chunk<number>, Chunk<...>], number, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<...>) => Sink.Sink<...>, bc: (_: Sink.Sink<...>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe(import Sink
Sink.const ignoreLeftover: <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<A, In, never, E, R>
Drains the remaining elements from the stream after the sink finishes
ignoreLeftover, import Sink
Sink.const collectLeftover: <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<[A, Chunk<L>], In, never, E, R>
Collects the leftovers from the stream when the sink succeeds and returns
them as part of the sink's result.
collectLeftover)6 )7)8
9import Effect
Effect.const runPromise: <[Chunk<number>, Chunk<never>], never>(effect: Effect.Effect<[Chunk<number>, Chunk<never>], never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<...>
Executes an effect and returns a `Promise` that resolves with the result.
Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises.
If the effect fails, the returned Promise will be rejected with the error.
runPromise(const s1: Effect.Effect<[Chunk<number>, Chunk<never>], never, never>
s1).(method) Promise<[Chunk<number>, Chunk<never>]>.then<void, never>(onfulfilled?: ((value: [Chunk<number>, Chunk<never>]) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>
Attaches callbacks for the resolution and/or rejection of the Promise.
then(namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log)10/*11Output:12[13 {14 _id: "Chunk",15 values: [ 1, 2, 3 ]16 }, {17 _id: "Chunk",18 values: []19 }20]21*/