PubSub
A PubSub
serves as an asynchronous message hub, allowing publishers to send messages that can be received by all current subscribers.
Unlike a Queue, where each value is delivered to only one consumer, a PubSub
broadcasts each published message to all subscribers. This makes PubSub
ideal for scenarios requiring message broadcasting rather than load distribution.
A PubSub<A>
stores messages of type A
and provides two fundamental operations:
API | Description |
---|---|
PubSub.publish | Sends a message of type A to the PubSub , returning an effect indicating if the message was successfully published. |
PubSub.subscribe | Creates a scoped effect that allows subscription to the PubSub , automatically unsubscribing when the scope ends. Subscribers receive messages through a Dequeue which holds published messages. |
Example (Publishing a Message to Multiple Subscribers)
1import { import Effect
Effect, import PubSub
PubSub, import Queue
Queue } from "effect"2
3const const program: Effect.Effect<void, never, never>
program = import Effect
Effect.const scoped: <void, never, Scope>(effect: Effect.Effect<void, never, Scope>) => Effect.Effect<void, never, never>
Scopes all resources used in this workflow to the lifetime of the workflow,
ensuring that their finalizers are run as soon as this workflow completes
execution, whether by success, failure, or interruption.
scoped(4 import Effect
Effect.const gen: <YieldWrap<Effect.Effect<PubSub.PubSub<string>, never, never>> | YieldWrap<Effect.Effect<Queue.Dequeue<string>, never, Scope>> | YieldWrap<...> | YieldWrap<...>, void>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen(function* () {5 const const pubsub: PubSub.PubSub<string>
pubsub = yield* import PubSub
PubSub.const bounded: <string>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect.Effect<PubSub.PubSub<string>, never, never>
Creates a bounded `PubSub` with the back pressure strategy. The `PubSub` will retain
messages until they have been taken by all subscribers, applying back
pressure to publishers if the `PubSub` is at capacity.
For best performance use capacities that are powers of two.
bounded<string>(2)6
7 // Two subscribers8 const const dequeue1: Queue.Dequeue<string>
dequeue1 = yield* import PubSub
PubSub.const subscribe: <string>(self: PubSub.PubSub<string>) => Effect.Effect<Queue.Dequeue<string>, never, Scope>
Subscribes to receive messages from the `PubSub`. The resulting subscription can
be evaluated multiple times within the scope to take a message from the `PubSub`
each time.
subscribe(const pubsub: PubSub.PubSub<string>
pubsub)9 const const dequeue2: Queue.Dequeue<string>
dequeue2 = yield* import PubSub
PubSub.const subscribe: <string>(self: PubSub.PubSub<string>) => Effect.Effect<Queue.Dequeue<string>, never, Scope>
Subscribes to receive messages from the `PubSub`. The resulting subscription can
be evaluated multiple times within the scope to take a message from the `PubSub`
each time.
subscribe(const pubsub: PubSub.PubSub<string>
pubsub)10
11 // Publish a message to the pubsub12 yield* import PubSub
PubSub.const publish: <string>(self: PubSub.PubSub<string>, value: string) => Effect.Effect<boolean> (+1 overload)
Publishes a message to the `PubSub`, returning whether the message was published
to the `PubSub`.
publish(const pubsub: PubSub.PubSub<string>
pubsub, "Hello from a PubSub!")13
14 // Each subscriber receives the message15 namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log("Subscriber 1: " + (yield* import Queue
Queue.const take: <string>(self: Queue.Dequeue<string>) => Effect.Effect<string, never, never>
Takes the oldest value in the queue. If the queue is empty, this will return
a computation that resumes when an item has been added to the queue.
take(const dequeue1: Queue.Dequeue<string>
dequeue1)))16 namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log("Subscriber 2: " + (yield* import Queue
Queue.const take: <string>(self: Queue.Dequeue<string>) => Effect.Effect<string, never, never>
Takes the oldest value in the queue. If the queue is empty, this will return
a computation that resumes when an item has been added to the queue.
take(const dequeue2: Queue.Dequeue<string>
dequeue2)))17 })18)19
20import Effect
Effect.const runFork: <void, never>(effect: Effect.Effect<void, never, never>, options?: RunForkOptions) => RuntimeFiber<void, never>
Executes an effect and returns a `RuntimeFiber` that represents the running computation.
Use `runFork` when you want to start an effect without blocking the current execution flow.
It returns a fiber that you can observe, interrupt, or join as needed.
runFork(const program: Effect.Effect<void, never, never>
program)21/*22Output:23Subscriber 1: Hello from a PubSub!24Subscriber 2: Hello from a PubSub!25*/
A bounded PubSub
applies back pressure to publishers when it reaches capacity, suspending additional publishing until space becomes available.
Back pressure ensures that all subscribers receive all messages while they are subscribed. However, it can lead to slower message delivery if a subscriber is slow.
Example (Bounded PubSub Creation)
1import { import PubSub
PubSub } from "effect"2
3// Creates a bounded PubSub with a capacity of 24const const boundedPubSub: Effect<PubSub.PubSub<string>, never, never>
boundedPubSub = import PubSub
PubSub.const bounded: <string>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect<PubSub.PubSub<string>, never, never>
Creates a bounded `PubSub` with the back pressure strategy. The `PubSub` will retain
messages until they have been taken by all subscribers, applying back
pressure to publishers if the `PubSub` is at capacity.
For best performance use capacities that are powers of two.
bounded<string>(2)
A dropping PubSub
discards new values when full. The PubSub.publish
operation returns false
if the message is dropped.
In a dropping pubsub, publishers can continue to publish new values, but subscribers are not guaranteed to receive all messages.
Example (Dropping PubSub Creation)
1import { import PubSub
PubSub } from "effect"2
3// Creates a dropping PubSub with a capacity of 24const const droppingPubSub: Effect<PubSub.PubSub<string>, never, never>
droppingPubSub = import PubSub
PubSub.const dropping: <string>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect<PubSub.PubSub<string>, never, never>
Creates a bounded `PubSub` with the dropping strategy. The `PubSub` will drop new
messages if the `PubSub` is at capacity.
For best performance use capacities that are powers of two.
dropping<string>(2)
A sliding PubSub
removes the oldest message to make space for new ones, ensuring that publishing never blocks.
A sliding pubsub prevents slow subscribers from impacting the message delivery rate. However, there’s still a risk that slow subscribers may miss some messages.
Example (Sliding PubSub Creation)
1import { import PubSub
PubSub } from "effect"2
3// Creates a sliding PubSub with a capacity of 24const const slidingPubSub: Effect<PubSub.PubSub<string>, never, never>
slidingPubSub = import PubSub
PubSub.const sliding: <string>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect<PubSub.PubSub<string>, never, never>
Creates a bounded `PubSub` with the sliding strategy. The `PubSub` will add new
messages and drop old messages if the `PubSub` is at capacity.
For best performance use capacities that are powers of two.
sliding<string>(2)
An unbounded PubSub
has no capacity limit, so publishing always succeeds immediately.
Unbounded pubsubs guarantee that all subscribers receive all messages without slowing down message delivery. However, they can grow indefinitely if messages are published faster than they are consumed.
Generally, it’s recommended to use bounded, dropping, or sliding pubsubs unless you have specific use cases for unbounded pubsubs.
Example
1import { import PubSub
PubSub } from "effect"2
3// Creates an unbounded PubSub with unlimited capacity4const const unboundedPubSub: Effect<PubSub.PubSub<string>, never, never>
unboundedPubSub = import PubSub
PubSub.const unbounded: <string>(options?: {
readonly replay?: number | undefined;
}) => Effect<PubSub.PubSub<string>, never, never>
Creates an unbounded `PubSub`.
unbounded<string>()
The PubSub.publishAll
function lets you publish multiple values to the pubsub at once.
Example (Publishing Multiple Messages)
1import { import Effect
Effect, import PubSub
PubSub, import Queue
Queue } from "effect"2
3const const program: Effect.Effect<void, never, never>
program = import Effect
Effect.const scoped: <void, never, Scope>(effect: Effect.Effect<void, never, Scope>) => Effect.Effect<void, never, never>
Scopes all resources used in this workflow to the lifetime of the workflow,
ensuring that their finalizers are run as soon as this workflow completes
execution, whether by success, failure, or interruption.
scoped(4 import Effect
Effect.const gen: <YieldWrap<Effect.Effect<PubSub.PubSub<string>, never, never>> | YieldWrap<Effect.Effect<Queue.Dequeue<string>, never, Scope>> | YieldWrap<...> | YieldWrap<...>, void>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen(function* () {5 const const pubsub: PubSub.PubSub<string>
pubsub = yield* import PubSub
PubSub.const bounded: <string>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect.Effect<PubSub.PubSub<string>, never, never>
Creates a bounded `PubSub` with the back pressure strategy. The `PubSub` will retain
messages until they have been taken by all subscribers, applying back
pressure to publishers if the `PubSub` is at capacity.
For best performance use capacities that are powers of two.
bounded<string>(2)6 const const dequeue: Queue.Dequeue<string>
dequeue = yield* import PubSub
PubSub.const subscribe: <string>(self: PubSub.PubSub<string>) => Effect.Effect<Queue.Dequeue<string>, never, Scope>
Subscribes to receive messages from the `PubSub`. The resulting subscription can
be evaluated multiple times within the scope to take a message from the `PubSub`
each time.
subscribe(const pubsub: PubSub.PubSub<string>
pubsub)7 yield* import PubSub
PubSub.const publishAll: <string>(self: PubSub.PubSub<string>, elements: Iterable<string>) => Effect.Effect<boolean> (+1 overload)
Publishes all of the specified messages to the `PubSub`, returning whether they
were published to the `PubSub`.
publishAll(const pubsub: PubSub.PubSub<string>
pubsub, ["Message 1", "Message 2"])8 namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log(yield* import Queue
Queue.const takeAll: <string>(self: Queue.Dequeue<string>) => Effect.Effect<Chunk<string>, never, never>
Takes all the values in the queue and returns the values. If the queue is
empty returns an empty collection.
takeAll(const dequeue: Queue.Dequeue<string>
dequeue))9 })10)11
12import Effect
Effect.const runFork: <void, never>(effect: Effect.Effect<void, never, never>, options?: RunForkOptions) => RuntimeFiber<void, never>
Executes an effect and returns a `RuntimeFiber` that represents the running computation.
Use `runFork` when you want to start an effect without blocking the current execution flow.
It returns a fiber that you can observe, interrupt, or join as needed.
runFork(const program: Effect.Effect<void, never, never>
program)13/*14Output:15{ _id: 'Chunk', values: [ 'Message 1', 'Message 2' ] }16*/
You can check the capacity and current size of a pubsub using PubSub.capacity
and PubSub.size
, respectively.
Note that PubSub.capacity
returns a number
because the capacity is set at pubsub creation and never changes.
In contrast, PubSub.size
returns an effect that determines the current size of the pubsub since the number of messages in the pubsub can change over time.
Example (Retrieving PubSub Capacity and Size)
1import { import Effect
Effect, import PubSub
PubSub } from "effect"2
3const const program: Effect.Effect<void, never, never>
program = import Effect
Effect.const gen: <YieldWrap<Effect.Effect<PubSub.PubSub<number>, never, never>> | YieldWrap<Effect.Effect<number, never, never>>, void>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen(function* () {4 const const pubsub: PubSub.PubSub<number>
pubsub = yield* import PubSub
PubSub.const bounded: <number>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect.Effect<PubSub.PubSub<number>, never, never>
Creates a bounded `PubSub` with the back pressure strategy. The `PubSub` will retain
messages until they have been taken by all subscribers, applying back
pressure to publishers if the `PubSub` is at capacity.
For best performance use capacities that are powers of two.
bounded<number>(2)5 namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log(`capacity: ${import PubSub
PubSub.const capacity: <number>(self: PubSub.PubSub<number>) => number
Returns the number of elements the queue can hold.
capacity(const pubsub: PubSub.PubSub<number>
pubsub)}`)6 namespace console
var console: Console
The `console` module provides a simple debugging console that is similar to the
JavaScript console mechanism provided by web browsers.
The module exports two specific components:
* A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream.
* A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and
[`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module.
_**Warning**_: The global console object's methods are neither consistently
synchronous like the browser APIs they resemble, nor are they consistently
asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for
more information.
Example using the global `console`:
```js
console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr
```
Example using the `Console` class:
```js
const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err
```
console.(method) Console.log(message?: any, ...optionalParams: any[]): void
Prints to `stdout` with newline. Multiple arguments can be passed, with the
first used as the primary message and all additional used as substitution
values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html)
(the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)).
```js
const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout
```
See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
log(`size: ${yield* import PubSub
PubSub.const size: <number>(self: PubSub.PubSub<number>) => Effect.Effect<number>
Retrieves the size of the queue, which is equal to the number of elements
in the queue. This may be negative if fibers are suspended waiting for
elements to be added to the queue.
size(const pubsub: PubSub.PubSub<number>
pubsub)}`)7})8
9import Effect
Effect.const runFork: <void, never>(effect: Effect.Effect<void, never, never>, options?: RunForkOptions) => RuntimeFiber<void, never>
Executes an effect and returns a `RuntimeFiber` that represents the running computation.
Use `runFork` when you want to start an effect without blocking the current execution flow.
It returns a fiber that you can observe, interrupt, or join as needed.
runFork(const program: Effect.Effect<void, never, never>
program)10/*11Output:12capacity: 213size: 014*/
To shut down a pubsub, use PubSub.shutdown
. You can also verify if it has been shut down with PubSub.isShutdown
, or wait for the shutdown to complete with PubSub.awaitShutdown
. Shutting down a pubsub also terminates all associated queues, ensuring that the shutdown signal is effectively communicated.
PubSub
operators mirror those of Queue with the main difference being that PubSub.publish
and PubSub.subscribe
are used in place of Queue.offer
and Queue.take
. If you’re already familiar with using a Queue
, you’ll find PubSub
straightforward.
Essentially, a PubSub
can be seen as a Enqueue
that only allows writes:
import type { import Queue
Queue } from "effect"
interface interface PubSub<A>
PubSub<(type parameter) A in PubSub<A>
A> extends import Queue
Queue.interface Enqueue<in A>
Enqueue<(type parameter) A in PubSub<A>
A> {}
Here, the Enqueue
type refers to a queue that only accepts enqueues (or writes). Any value enqueued here is published to the pubsub, and operations like shutdown will also affect the pubsub.
This design makes PubSub
highly flexible, letting you use it anywhere you need a Enqueue
that only accepts published values.