Skip to content

SubscriptionRef

A SubscriptionRef<A> is a specialized form of a SynchronizedRef. It allows us to subscribe and receive updates on the current value and any changes made to that value.

interface SubscriptionRef<A> extends SynchronizedRef<A> {
/**
* A stream containing the current value of the `Ref` as well as all changes
* to that value.
*/
readonly changes: Stream<A>
}

You can perform all the usual operations on a SubscriptionRef, such as get, set, or modify to work with the current value.

The changes stream is where the magic happens. It lets you observe the current value and all subsequent changes. Each time you run this stream, you’ll get the current value as of that moment and any changes that occurred afterward.

To create a SubscriptionRef, you can use the make constructor, specifying the initial value:

1
import {
import SubscriptionRef
SubscriptionRef
} from "effect"
2
3
const
const ref: Effect<SubscriptionRef.SubscriptionRef<number>, never, never>
ref
=
import SubscriptionRef
SubscriptionRef
.
const make: <number>(value: number) => Effect<SubscriptionRef.SubscriptionRef<number>, never, never>

Creates a new `SubscriptionRef` with the specified value.

make
(0)

A SubscriptionRef can be invaluable when modeling shared state, especially when multiple observers need to react to every change in that shared state. For example, in a functional reactive programming context, the SubscriptionRef value might represent a part of the application state, and each observer could update various user interface elements based on changes to that state.

To see how this works, let’s create a simple example where a “server” repeatedly updates a value observed by multiple “clients”:

1
import {
import Ref
Ref
,
import Effect
Effect
} from "effect"
2
3
const
const server: (ref: Ref.Ref<number>) => Effect.Effect<never, never, never>
server
= (
(parameter) ref: Ref.Ref<number>
ref
:
import Ref
Ref
.
interface Ref<in out A> namespace Ref
Ref
<number>) =>
4
import Ref
Ref
.
const update: <number>(self: Ref.Ref<number>, f: (a: number) => number) => Effect.Effect<void> (+1 overload)
update
(
(parameter) ref: Ref.Ref<number>
ref
, (
(parameter) n: number
n
) =>
(parameter) n: number
n
+ 1).
(method) Pipeable.pipe<Effect.Effect<void, never, never>, Effect.Effect<never, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<void, never, never>) => Effect.Effect<never, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
import Effect
Effect
.
const forever: <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<never, E, R>

Repeats this effect forever (until the first error).

forever
)

Note that the server function operates on a regular Ref and doesn’t need to know about SubscriptionRef. It simply updates a value.

1
import {
import Ref
Ref
,
import Effect
Effect
,
import Stream
Stream
,
import Random
Random
} from "effect"
2
3
const
const server: (ref: Ref.Ref<number>) => Effect.Effect<never, never, never>
server
= (
(parameter) ref: Ref.Ref<number>
ref
:
import Ref
Ref
.
interface Ref<in out A> namespace Ref
Ref
<number>) =>
4
import Ref
Ref
.
const update: <number>(self: Ref.Ref<number>, f: (a: number) => number) => Effect.Effect<void> (+1 overload)
update
(
(parameter) ref: Ref.Ref<number>
ref
, (
(parameter) n: number
n
) =>
(parameter) n: number
n
+ 1).
(method) Pipeable.pipe<Effect.Effect<void, never, never>, Effect.Effect<never, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<void, never, never>) => Effect.Effect<never, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
import Effect
Effect
.
const forever: <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<never, E, R>

Repeats this effect forever (until the first error).

forever
)
5
6
const
const client: (changes: Stream.Stream<number>) => Effect.Effect<Chunk<number>, never, never>
client
= (
(parameter) changes: Stream.Stream<number, never, never>
changes
:
import Stream
Stream
.
interface Stream<out A, out E = never, out R = never> namespace Stream

A `Stream<A, E, R>` is a description of a program that, when evaluated, may emit zero or more values of type `A`, may fail with errors of type `E`, and uses an context of type `R`. One way to think of `Stream` is as a `Effect` program that could emit multiple values. `Stream` is a purely functional *pull* based stream. Pull based streams offer inherent laziness and backpressure, relieving users of the need to manage buffers between operators. As an optimization, `Stream` does not emit single values, but rather an array of values. This allows the cost of effect evaluation to be amortized. `Stream` forms a monad on its `A` type parameter, and has error management facilities for its `E` type parameter, modeled similarly to `Effect` (with some adjustments for the multiple-valued nature of `Stream`). These aspects allow for rich and expressive composition of streams.

Stream
<number>) =>
7
import Effect
Effect
.
const gen: <YieldWrap<Effect.Effect<number, never, never>> | YieldWrap<Effect.Effect<Chunk<number>, never, never>>, Chunk<number>>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen
(function* () {
8
const
const n: number
n
= yield*
import Random
Random
.
const nextIntBetween: (min: number, max: number) => Effect.Effect<number>

Returns the next integer value in the specified range from the pseudo-random number generator.

nextIntBetween
(1, 10)
9
const
const chunk: Chunk<number>
chunk
= yield*
import Stream
Stream
.
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.

runCollect
(
import Stream
Stream
.
const take: <number, never, never>(self: Stream.Stream<number, never, never>, n: number) => Stream.Stream<number, never, never> (+1 overload)

Takes the specified number of elements from this stream.

take
(
(parameter) changes: Stream.Stream<number, never, never>
changes
,
const n: number
n
))
10
return
const chunk: Chunk<number>
chunk
11
})

Similarly, the client function only works with a Stream of values and doesn’t concern itself with the source of these values.

To tie everything together, we start the server, launch multiple client instances in parallel, and then shut down the server when we’re finished. We also create the SubscriptionRef in this process.

1
import {
2
import Ref
Ref
,
3
import Effect
Effect
,
4
import Stream
Stream
,
5
import Random
Random
,
6
import SubscriptionRef
SubscriptionRef
,
7
import Fiber
Fiber
8
} from "effect"
9
10
const
const server: (ref: Ref.Ref<number>) => Effect.Effect<never, never, never>
server
= (
(parameter) ref: Ref.Ref<number>
ref
:
import Ref
Ref
.
interface Ref<in out A> namespace Ref
Ref
<number>) =>
11
import Ref
Ref
.
const update: <number>(self: Ref.Ref<number>, f: (a: number) => number) => Effect.Effect<void> (+1 overload)
update
(
(parameter) ref: Ref.Ref<number>
ref
, (
(parameter) n: number
n
) =>
(parameter) n: number
n
+ 1).
(method) Pipeable.pipe<Effect.Effect<void, never, never>, Effect.Effect<never, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<void, never, never>) => Effect.Effect<never, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
import Effect
Effect
.
const forever: <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<never, E, R>

Repeats this effect forever (until the first error).

forever
)
12
13
const
const client: (changes: Stream.Stream<number>) => Effect.Effect<Chunk<number>, never, never>
client
= (
(parameter) changes: Stream.Stream<number, never, never>
changes
:
import Stream
Stream
.
interface Stream<out A, out E = never, out R = never> namespace Stream

A `Stream<A, E, R>` is a description of a program that, when evaluated, may emit zero or more values of type `A`, may fail with errors of type `E`, and uses an context of type `R`. One way to think of `Stream` is as a `Effect` program that could emit multiple values. `Stream` is a purely functional *pull* based stream. Pull based streams offer inherent laziness and backpressure, relieving users of the need to manage buffers between operators. As an optimization, `Stream` does not emit single values, but rather an array of values. This allows the cost of effect evaluation to be amortized. `Stream` forms a monad on its `A` type parameter, and has error management facilities for its `E` type parameter, modeled similarly to `Effect` (with some adjustments for the multiple-valued nature of `Stream`). These aspects allow for rich and expressive composition of streams.

Stream
<number>) =>
14
import Effect
Effect
.
const gen: <YieldWrap<Effect.Effect<number, never, never>> | YieldWrap<Effect.Effect<Chunk<number>, never, never>>, Chunk<number>>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen
(function* () {
15
const
const n: number
n
= yield*
import Random
Random
.
const nextIntBetween: (min: number, max: number) => Effect.Effect<number>

Returns the next integer value in the specified range from the pseudo-random number generator.

nextIntBetween
(1, 10)
16
const
const chunk: Chunk<number>
chunk
= yield*
import Stream
Stream
.
const runCollect: <number, never, never>(self: Stream.Stream<number, never, never>) => Effect.Effect<Chunk<number>, never, never>

Runs the stream and collects all of its elements to a chunk.

runCollect
(
import Stream
Stream
.
const take: <number, never, never>(self: Stream.Stream<number, never, never>, n: number) => Stream.Stream<number, never, never> (+1 overload)

Takes the specified number of elements from this stream.

take
(
(parameter) changes: Stream.Stream<number, never, never>
changes
,
const n: number
n
))
17
return
const chunk: Chunk<number>
chunk
18
})
19
20
const
const program: Effect.Effect<void, never, never>
program
=
import Effect
Effect
.
const gen: <YieldWrap<Effect.Effect<SubscriptionRef.SubscriptionRef<number>, never, never>> | YieldWrap<Effect.Effect<Fiber.RuntimeFiber<never, never>, never, never>> | YieldWrap<...> | YieldWrap<...>, void>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen
(function* () {
21
const
const ref: SubscriptionRef.SubscriptionRef<number>
ref
= yield*
import SubscriptionRef
SubscriptionRef
.
const make: <number>(value: number) => Effect.Effect<SubscriptionRef.SubscriptionRef<number>, never, never>

Creates a new `SubscriptionRef` with the specified value.

make
(0)
22
const
const serverFiber: Fiber.RuntimeFiber<never, never>
serverFiber
= yield*
import Effect
Effect
.
const fork: <never, never, never>(self: Effect.Effect<never, never, never>) => Effect.Effect<Fiber.RuntimeFiber<never, never>, never, never>

Returns an effect that forks this effect into its own separate fiber, returning the fiber immediately, without waiting for it to begin executing the effect. You can use the `fork` method whenever you want to execute an effect in a new fiber, concurrently and without "blocking" the fiber executing other effects. Using fibers can be tricky, so instead of using this method directly, consider other higher-level methods, such as `raceWith`, `zipPar`, and so forth. The fiber returned by this method has methods to interrupt the fiber and to wait for it to finish executing the effect. See `Fiber` for more information. Whenever you use this method to launch a new fiber, the new fiber is attached to the parent fiber's scope. This means when the parent fiber terminates, the child fiber will be terminated as well, ensuring that no fibers leak. This behavior is called "auto supervision", and if this behavior is not desired, you may use the `forkDaemon` or `forkIn` methods.

fork
(
const server: (ref: Ref.Ref<number>) => Effect.Effect<never, never, never>
server
(
const ref: SubscriptionRef.SubscriptionRef<number>
ref
))
23
const
const clients: Effect.Effect<Chunk<number>, never, never>[]
clients
= new
var Array: ArrayConstructor new (arrayLength?: number) => any[] (+2 overloads)
Array
(5).
(method) Array<any>.fill(value: any, start?: number, end?: number): any[]

Changes all array elements from `start` to `end` index to a static `value` and returns the modified array

fill
(null).
(method) Array<any>.map<Effect.Effect<Chunk<number>, never, never>>(callbackfn: (value: any, index: number, array: any[]) => Effect.Effect<Chunk<number>, never, never>, thisArg?: any): Effect.Effect<...>[]

Calls a defined callback function on each element of an array, and returns an array that contains the results.

map
(() =>
const client: (changes: Stream.Stream<number>) => Effect.Effect<Chunk<number>, never, never>
client
(
const ref: SubscriptionRef.SubscriptionRef<number>
ref
.
(property) SubscriptionRef<number>.changes: Stream.Stream<number, never, never>

A stream containing the current value of the `Ref` as well as all changes to that value.

changes
))
24
const
const chunks: Chunk<number>[]
chunks
= yield*
import Effect
Effect
.
const all: <Effect.Effect<Chunk<number>, never, never>[], { concurrency: "unbounded"; }>(arg: Effect.Effect<Chunk<number>, never, never>[], options?: { concurrency: "unbounded"; } | undefined) => Effect.Effect<...>

Runs all the provided effects in sequence respecting the structure provided in input. Supports multiple arguments, a single argument tuple / array or record / struct.

all
(
const clients: Effect.Effect<Chunk<number>, never, never>[]
clients
, {
(property) concurrency: "unbounded"
concurrency
: "unbounded" })
25
yield*
import Fiber
Fiber
.
const interrupt: <never, never>(self: Fiber.Fiber<never, never>) => Effect.Effect<Exit<never, never>, never, never>

Interrupts the fiber from whichever fiber is calling this method. If the fiber has already exited, the returned effect will resume immediately. Otherwise, the effect will resume when the fiber exits.

interrupt
(
const serverFiber: Fiber.RuntimeFiber<never, never>
serverFiber
)
26
for (const
const chunk: Chunk<number>
chunk
of
const chunks: Chunk<number>[]
chunks
) {
27
namespace console var console: Console

The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```

console
.
(method) Console.log(message?: any, ...optionalParams: any[]): void

Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.

log
(
const chunk: Chunk<number>
chunk
)
28
}
29
})
30
31
import Effect
Effect
.
const runPromise: <void, never>(effect: Effect.Effect<void, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<void>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const program: Effect.Effect<void, never, never>
program
)
32
/*
33
Output:
34
{
35
_id: "Chunk",
36
values: [ 2, 3, 4 ]
37
}
38
{
39
_id: "Chunk",
40
values: [ 2 ]
41
}
42
{
43
_id: "Chunk",
44
values: [ 2, 3, 4, 5, 6, 7 ]
45
}
46
{
47
_id: "Chunk",
48
values: [ 2, 3, 4 ]
49
}
50
{
51
_id: "Chunk",
52
values: [ 2, 3, 4, 5, 6, 7, 8, 9 ]
53
}
54
*/

This setup ensures that each client observes the current value when it starts and receives all subsequent changes to the value.

Since the changes are represented as streams, you can easily build more complex programs using familiar stream operators. You can transform, filter, or merge these streams with other streams to achieve more sophisticated behavior.