Sink Operations
In the previous sections, we learned how to create and use sinks. Now, let’s explore some operations you can perform on sinks to transform or filter their behavior.
Sometimes, you have a sink that works perfectly with one type of input, but you want to use it with a different type. This is where Sink.mapInput
comes in handy. While Sink.map
modifies the output of a function, Sink.mapInput
modifies the input. It allows you to adapt your sink to work with a different input.
Imagine you have a Sink.sum
that calculates the sum of incoming numeric values. However, your stream contains strings, not numbers. You can use mapInput
to convert your strings into numbers and make Sink.sum
compatible with your stream:
1import { import Stream
Stream, import Sink
Sink, import Effect
Effect } from "effect"2
3const const numericSum: Sink.Sink<number, number, never, never, never>
numericSum = import Sink
Sink.const sum: Sink.Sink<number, number, never, never, never>
A sink that sums incoming numeric values.
sum4
5const const stringSum: Sink.Sink<number, string, never, never, never>
stringSum = const numericSum: Sink.Sink<number, number, never, never, never>
numericSum.(method) Pipeable.pipe<Sink.Sink<number, number, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<number, number, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe(6 import Sink
Sink.const mapInput: <string, number>(f: (input: string) => number) => <A, L, E, R>(self: Sink.Sink<A, number, L, E, R>) => Sink.Sink<A, string, L, E, R> (+1 overload)
Transforms this sink's input elements.
mapInput(((parameter) s: string
s: string) => var Number: NumberConstructor
An object that represents a number of any kind. All JavaScript numbers are 64-bit floating-point numbers.
Number.(method) NumberConstructor.parseFloat(string: string): number
Converts a string to a floating-point number.
parseFloat((parameter) s: string
s))7)8
9import Effect
Effect.const runPromise: <number, never>(effect: Effect.Effect<number, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<number>
Executes an effect and returns a `Promise` that resolves with the result.
Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises.
If the effect fails, the returned Promise will be rejected with the error.
runPromise(10 import Stream
Stream.const make: <[string, string, string, string, string]>(as_0: string, as_1: string, as_2: string, as_3: string, as_4: string) => Stream.Stream<string, never, never>
Creates a stream from an sequence of values.
make("1", "2", "3", "4", "5").(method) Pipeable.pipe<Stream.Stream<string, never, never>, Effect.Effect<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<string, never, never>) => Effect.Effect<number, never, never>): Effect.Effect<...> (+21 overloads)
pipe(import Stream
Stream.const run: <number, string, never, never>(sink: Sink.Sink<number, string, unknown, never, never>) => <E, R>(self: Stream.Stream<string, E, R>) => Effect.Effect<number, E, Exclude<R, Scope>> (+1 overload)
Runs the sink on the stream to produce either the sink's result or an error.
run(const stringSum: Sink.Sink<number, string, never, never, never>
stringSum))11).(method) Promise<number>.then<void, never>(onfulfilled?: ((value: number) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>
Attaches callbacks for the resolution and/or rejection of the Promise.
then(namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log)12/*13Output:141515*/
If you need to change both the input and output of a sink, you can use Sink.dimap
. It’s an extended version of mapInput
that lets you transform both types. This can be useful when you need to perform a complete conversion between your input and output types:
1import { import Stream
Stream, import Sink
Sink, import Effect
Effect } from "effect"2
3// Convert its input to integers, do the computation and then4// convert them back to a string5const const sumSink: Sink.Sink<string, string, never, never, never>
sumSink = import Sink
Sink.const sum: Sink.Sink<number, number, never, never, never>
A sink that sums incoming numeric values.
sum.(method) Pipeable.pipe<Sink.Sink<number, number, never, never, never>, Sink.Sink<string, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<number, number, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe(6 import Sink
Sink.const dimap: <string, number, number, string>(options: {
readonly onInput: (input: string) => number;
readonly onDone: (a: number) => string;
}) => <L, E, R>(self: Sink.Sink<number, number, L, E, R>) => Sink.Sink<...> (+1 overload)
Transforms both inputs and result of this sink using the provided
functions.
dimap({7 (property) onInput: (input: string) => number
onInput: ((parameter) s: string
s: string) => var Number: NumberConstructor
An object that represents a number of any kind. All JavaScript numbers are 64-bit floating-point numbers.
Number.(method) NumberConstructor.parseFloat(string: string): number
Converts a string to a floating-point number.
parseFloat((parameter) s: string
s),8 (property) onDone: (a: number) => string
onDone: ((parameter) n: number
n) => var String: StringConstructor
(value?: any) => string
Allows manipulation and formatting of text strings and determination and location of substrings within strings.
String((parameter) n: number
n)9 })10)11
12import Effect
Effect.const runPromise: <string, never>(effect: Effect.Effect<string, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<string>
Executes an effect and returns a `Promise` that resolves with the result.
Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises.
If the effect fails, the returned Promise will be rejected with the error.
runPromise(13 import Stream
Stream.const make: <[string, string, string, string, string]>(as_0: string, as_1: string, as_2: string, as_3: string, as_4: string) => Stream.Stream<string, never, never>
Creates a stream from an sequence of values.
make("1", "2", "3", "4", "5").(method) Pipeable.pipe<Stream.Stream<string, never, never>, Effect.Effect<string, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<string, never, never>) => Effect.Effect<string, never, never>): Effect.Effect<...> (+21 overloads)
pipe(import Stream
Stream.const run: <string, string, never, never>(sink: Sink.Sink<string, string, unknown, never, never>) => <E, R>(self: Stream.Stream<string, E, R>) => Effect.Effect<string, E, Exclude<R, Scope>> (+1 overload)
Runs the sink on the stream to produce either the sink's result or an error.
run(const sumSink: Sink.Sink<string, string, never, never, never>
sumSink))14).(method) Promise<string>.then<void, never>(onfulfilled?: ((value: string) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>
Attaches callbacks for the resolution and/or rejection of the Promise.
then(namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log)15/*16Output:1715 <-- as string18*/
Sinks offer a way to filter incoming elements using Sink.filterInput
. This allows you to collect or process only the elements that meet a specific condition. In the following example, we collect elements in chunks of three and filter out the negative numbers:
1import { import Stream
Stream, import Sink
Sink, import Effect
Effect } from "effect"2
3const const values: number[]
values = [1, -2, 0, 1, 3, -3, 4, 2, 0, 1, -3, 1, 1, 6]4const const stream: Stream.Stream<Chunk<number>, never, never>
stream = import Stream
Stream.const fromIterable: <number>(iterable: Iterable<number>) => Stream.Stream<number, never, never>
Creates a new `Stream` from an iterable collection of values.
fromIterable(const values: number[]
values).(method) Pipeable.pipe<Stream.Stream<number, never, never>, Stream.Stream<Chunk<number>, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<number, never, never>) => Stream.Stream<Chunk<number>, never, never>): Stream.Stream<...> (+21 overloads)
pipe(5 import Stream
Stream.const transduce: <Chunk<number>, number, never, never>(sink: Sink.Sink<Chunk<number>, number, number, never, never>) => <E, R>(self: Stream.Stream<number, E, R>) => Stream.Stream<...> (+1 overload)
Applies the transducer to the stream and emits its outputs.
transduce(6 import Sink
Sink.const collectAllN: <number>(n: number) => Sink.Sink<Chunk<number>, number, number, never, never>
A sink that collects first `n` elements into a chunk.
collectAllN<number>(3).(method) Pipeable.pipe<Sink.Sink<Chunk<number>, number, number, never, never>, Sink.Sink<Chunk<number>, number, number, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<Chunk<number>, number, number, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe(import Sink
Sink.const filterInput: <number, number>(f: Predicate<number>) => <A, L, E, R>(self: Sink.Sink<A, number, L, E, R>) => Sink.Sink<A, number, L, E, R> (+1 overload)
Filters the sink's input with the given predicate.
filterInput(((parameter) n: number
n) => (parameter) n: number
n > 0))7 )8)9
10import Effect
Effect.const runPromise: <Chunk<Chunk<number>>, never>(effect: Effect.Effect<Chunk<Chunk<number>>, never, never>, options?: {
readonly signal?: AbortSignal;
} | undefined) => Promise<...>
Executes an effect and returns a `Promise` that resolves with the result.
Use `runPromise` when working with asynchronous effects and you need to integrate with code that uses Promises.
If the effect fails, the returned Promise will be rejected with the error.
runPromise(import Stream
Stream.const runCollect: <Chunk<number>, never, never>(self: Stream.Stream<Chunk<number>, never, never>) => Effect.Effect<Chunk<Chunk<number>>, never, never>
Runs the stream and collects all of its elements to a chunk.
runCollect(const stream: Stream.Stream<Chunk<number>, never, never>
stream)).(method) Promise<Chunk<Chunk<number>>>.then<void, never>(onfulfilled?: ((value: Chunk<Chunk<number>>) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>
Attaches callbacks for the resolution and/or rejection of the Promise.
then(namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log)11/*12Output:13{14 _id: "Chunk",15 values: [16 {17 _id: "Chunk",18 values: [ 1, 1, 3 ]19 }, {20 _id: "Chunk",21 values: [ 4, 2, 1 ]22 }, {23 _id: "Chunk",24 values: [ 1, 1, 6 ]25 }, {26 _id: "Chunk",27 values: []28 }29 ]30}31*/