Skip to content

Sink Concurrency

In this section, we’ll explore parallel operations that allow you to run multiple sinks concurrently. These operations can be quite useful when you need to perform tasks simultaneously.

When you want to run two sinks concurrently and combine their results, you can use Sink.zip. This operation runs both sinks concurrently and combines their outcomes into a tuple:

1
import {
import Sink
Sink
,
import Console
Console
,
import Stream
Stream
,
import Schedule
Schedule
,
import Effect
Effect
} from "effect"
2
3
const
const s1: Sink.Sink<number, string, never, never, never>
s1
=
import Sink
Sink
.
const forEach: <string, void, never, never>(f: (input: string) => Effect.Effect<void, never, never>) => Sink.Sink<void, string, never, never, never>

A sink that executes the provided effectful function for every element fed to it.

forEach
((
(parameter) s: string
s
: string) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
(`sink 1: ${
(parameter) s: string
s
}`)).
(method) Pipeable.pipe<Sink.Sink<void, string, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<void, string, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
4
import Sink
Sink
.
const as: <number>(a: number) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<number, In, L, E, R> (+1 overload)

Replaces this sink's result with the provided value.

as
(1)
5
)
6
7
const
const s2: Sink.Sink<number, string, never, never, never>
s2
=
import Sink
Sink
.
const forEach: <string, void, never, never>(f: (input: string) => Effect.Effect<void, never, never>) => Sink.Sink<void, string, never, never, never>

A sink that executes the provided effectful function for every element fed to it.

forEach
((
(parameter) s: string
s
: string) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
(`sink 2: ${
(parameter) s: string
s
}`)).
(method) Pipeable.pipe<Sink.Sink<void, string, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<void, string, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
8
import Sink
Sink
.
const as: <number>(a: number) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<number, In, L, E, R> (+1 overload)

Replaces this sink's result with the provided value.

as
(2)
9
)
10
11
const
const sink: Sink.Sink<[number, number], string, never, never, never>
sink
=
const s1: Sink.Sink<number, string, never, never, never>
s1
.
(method) Pipeable.pipe<Sink.Sink<number, string, never, never, never>, Sink.Sink<[number, number], string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<number, string, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
import Sink
Sink
.
const zip: <number, string, string, never, never, never>(that: Sink.Sink<number, string, never, never, never>, options?: { readonly concurrent?: boolean | undefined; } | undefined) => <A, L, E, R>(self: Sink.Sink<...>) => Sink.Sink<...> (+1 overload)

Feeds inputs to this sink until it yields a result, then switches over to the provided sink until it yields a result, finally combining the two results into a tuple.

zip
(
const s2: Sink.Sink<number, string, never, never, never>
s2
, {
(property) concurrent?: boolean | undefined
concurrent
: true }))
12
13
import Effect
Effect
.
const runPromise: <[number, number], never>(effect: Effect.Effect<[number, number], never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<[number, number]>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
14
import Stream
Stream
.
const make: <[string, string, string, string, string]>(as_0: string, as_1: string, as_2: string, as_3: string, as_4: string) => Stream.Stream<string, never, never>

Creates a stream from an sequence of values.

make
("1", "2", "3", "4", "5").
(method) Pipeable.pipe<Stream.Stream<string, never, never>, Stream.Stream<string, never, never>, Effect.Effect<[number, number], never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<string, never, never>) => Stream.Stream<...>, bc: (_: Stream.Stream<...>) => Effect.Effect<...>): Effect.Effect<...> (+21 overloads)
pipe
(
15
import Stream
Stream
.
const schedule: <number, string, never, string>(schedule: Schedule.Schedule<number, string, never>) => <E, R>(self: Stream.Stream<string, E, R>) => Stream.Stream<string, E, R> (+1 overload)

Schedules the output of the stream using the provided `schedule`.

schedule
(
import Schedule
Schedule
.
const spaced: (duration: DurationInput) => Schedule.Schedule<number>

Returns a schedule that recurs continuously, each repetition spaced the specified duration from the last run.

spaced
("10 millis")),
16
import Stream
Stream
.
const run: <[number, number], string, never, never>(sink: Sink.Sink<[number, number], string, unknown, never, never>) => <E, R>(self: Stream.Stream<string, E, R>) => Effect.Effect<[number, number], E, Exclude<...>> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

run
(
const sink: Sink.Sink<[number, number], string, never, never, never>
sink
)
17
)
18
).
(method) Promise<[number, number]>.then<void, never>(onfulfilled?: ((value: [number, number]) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) globalThis.Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
19
/*
20
Output:
21
sink 1: 1
22
sink 2: 1
23
sink 1: 2
24
sink 2: 2
25
sink 1: 3
26
sink 2: 3
27
sink 1: 4
28
sink 2: 4
29
sink 1: 5
30
sink 2: 5
31
[ 1, 2 ]
32
*/

Another useful operation is Sink.race, which lets you race multiple sinks concurrently. The sink that completes first will provide the result for your program:

1
import {
import Sink
Sink
,
import Console
Console
,
import Stream
Stream
,
import Schedule
Schedule
,
import Effect
Effect
} from "effect"
2
3
const
const s1: Sink.Sink<number, string, never, never, never>
s1
=
import Sink
Sink
.
const forEach: <string, void, never, never>(f: (input: string) => Effect.Effect<void, never, never>) => Sink.Sink<void, string, never, never, never>

A sink that executes the provided effectful function for every element fed to it.

forEach
((
(parameter) s: string
s
: string) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
(`sink 1: ${
(parameter) s: string
s
}`)).
(method) Pipeable.pipe<Sink.Sink<void, string, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<void, string, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
4
import Sink
Sink
.
const as: <number>(a: number) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<number, In, L, E, R> (+1 overload)

Replaces this sink's result with the provided value.

as
(1)
5
)
6
7
const
const s2: Sink.Sink<number, string, never, never, never>
s2
=
import Sink
Sink
.
const forEach: <string, void, never, never>(f: (input: string) => Effect.Effect<void, never, never>) => Sink.Sink<void, string, never, never, never>

A sink that executes the provided effectful function for every element fed to it.

forEach
((
(parameter) s: string
s
: string) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
(`sink 2: ${
(parameter) s: string
s
}`)).
(method) Pipeable.pipe<Sink.Sink<void, string, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<void, string, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
8
import Sink
Sink
.
const as: <number>(a: number) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<number, In, L, E, R> (+1 overload)

Replaces this sink's result with the provided value.

as
(2)
9
)
10
11
const
const sink: Sink.Sink<number, string, never, never, never>
sink
=
const s1: Sink.Sink<number, string, never, never, never>
s1
.
(method) Pipeable.pipe<Sink.Sink<number, string, never, never, never>, Sink.Sink<number, string, never, never, never>>(this: Sink.Sink<...>, ab: (_: Sink.Sink<number, string, never, never, never>) => Sink.Sink<...>): Sink.Sink<...> (+21 overloads)
pipe
(
import Sink
Sink
.
const race: <never, never, string, never, number>(that: Sink.Sink<number, string, never, never, never>) => <A, In, L, E, R>(self: Sink.Sink<A, In, L, E, R>) => Sink.Sink<...> (+1 overload)

Runs both sinks in parallel on the input, , returning the result or the error from the one that finishes first.

race
(
const s2: Sink.Sink<number, string, never, never, never>
s2
))
12
13
import Effect
Effect
.
const runPromise: <number, never>(effect: Effect.Effect<number, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<number>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
14
import Stream
Stream
.
const make: <[string, string, string, string, string]>(as_0: string, as_1: string, as_2: string, as_3: string, as_4: string) => Stream.Stream<string, never, never>

Creates a stream from an sequence of values.

make
("1", "2", "3", "4", "5").
(method) Pipeable.pipe<Stream.Stream<string, never, never>, Stream.Stream<string, never, never>, Effect.Effect<number, never, never>>(this: Stream.Stream<...>, ab: (_: Stream.Stream<string, never, never>) => Stream.Stream<...>, bc: (_: Stream.Stream<...>) => Effect.Effect<...>): Effect.Effect<...> (+21 overloads)
pipe
(
15
import Stream
Stream
.
const schedule: <number, string, never, string>(schedule: Schedule.Schedule<number, string, never>) => <E, R>(self: Stream.Stream<string, E, R>) => Stream.Stream<string, E, R> (+1 overload)

Schedules the output of the stream using the provided `schedule`.

schedule
(
import Schedule
Schedule
.
const spaced: (duration: DurationInput) => Schedule.Schedule<number>

Returns a schedule that recurs continuously, each repetition spaced the specified duration from the last run.

spaced
("10 millis")),
16
import Stream
Stream
.
const run: <number, string, never, never>(sink: Sink.Sink<number, string, unknown, never, never>) => <E, R>(self: Stream.Stream<string, E, R>) => Effect.Effect<number, E, Exclude<R, Scope>> (+1 overload)

Runs the sink on the stream to produce either the sink's result or an error.

run
(
const sink: Sink.Sink<number, string, never, never, never>
sink
)
17
)
18
).
(method) Promise<number>.then<void, never>(onfulfilled?: ((value: number) => void | PromiseLike<void>) | null | undefined, onrejected?: ((reason: any) => PromiseLike<never>) | null | undefined): Promise<...>

Attaches callbacks for the resolution and/or rejection of the Promise.

then
(
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) globalThis.Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
)
19
/*
20
Output:
21
sink 1: 1
22
sink 2: 1
23
sink 1: 2
24
sink 2: 2
25
sink 1: 3
26
sink 2: 3
27
sink 1: 4
28
sink 2: 4
29
sink 1: 5
30
sink 2: 5
31
1
32
*/