Skip to content

PubSub

In this guide, we’ll explore the concept of a PubSub, which is an asynchronous message hub. It allows publishers to send messages to the pubsub, and subscribers can receive those messages.

Unlike a Queue, where each value offered to the queue can be taken by one taker, each value published to a pubsub can be received by all subscribers.

Whereas a Queue represents the optimal solution to the problem of how to distribute values, a PubSub represents the optimal solution to the problem of how to broadcast them.

The core operations of a PubSub are PubSub.publish and PubSub.subscribe:

  • The publish operation sends a message of type A to the pubsub. It returns an effect that indicates whether the message was successfully published.
  • The subscribe operation returns a scoped effect that allows you to subscribe to the pubsub. It automatically unsubscribes when the scope is closed. Within the scope, you gain access to a Dequeue, which is essentially a Queue for dequeuing messages published to the pubsub.

Let’s look at an example to understand how to use a pubsub:

1
import {
import Effect
Effect
,
import PubSub
PubSub
,
import Queue
Queue
,
import Console
Console
} from "effect"
2
3
const
const program: Effect.Effect<void, never, never>
program
=
import PubSub
PubSub
.
const bounded: <string>(capacity: number | { readonly capacity: number; readonly replay?: number | undefined; }) => Effect.Effect<PubSub.PubSub<string>, never, never>

Creates a bounded `PubSub` with the back pressure strategy. The `PubSub` will retain messages until they have been taken by all subscribers, applying back pressure to publishers if the `PubSub` is at capacity. For best performance use capacities that are powers of two.

bounded
<string>(2).
(method) Pipeable.pipe<Effect.Effect<PubSub.PubSub<string>, never, never>, Effect.Effect<void, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<PubSub.PubSub<string>, never, never>) => Effect.Effect<...>): Effect.Effect<...> (+21 overloads)
pipe
(
4
import Effect
Effect
.
const andThen: <PubSub.PubSub<string>, Effect.Effect<void, never, never>>(f: (a: PubSub.PubSub<string>) => Effect.Effect<void, never, never>) => <E, R>(self: Effect.Effect<...>) => Effect.Effect<...> (+3 overloads)

Executes a sequence of two actions, typically two `Effect`s, where the second action can depend on the result of the first action. The `that` action can take various forms: - a value - a function returning a value - a promise - a function returning a promise - an effect - a function returning an effect

andThen
((
(parameter) pubsub: PubSub.PubSub<string>
pubsub
) =>
5
import Effect
Effect
.
const scoped: <void, never, Scope>(effect: Effect.Effect<void, never, Scope>) => Effect.Effect<void, never, never>

Scopes all resources used in this workflow to the lifetime of the workflow, ensuring that their finalizers are run as soon as this workflow completes execution, whether by success, failure, or interruption.

scoped
(
6
import Effect
Effect
.
const gen: <YieldWrap<Effect.Effect<Queue.Dequeue<string>, never, Scope>> | YieldWrap<Effect.Effect<void, never, never>>, void>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen
(function* () {
7
const
const dequeue1: Queue.Dequeue<string>
dequeue1
= yield*
import PubSub
PubSub
.
const subscribe: <string>(self: PubSub.PubSub<string>) => Effect.Effect<Queue.Dequeue<string>, never, Scope>

Subscribes to receive messages from the `PubSub`. The resulting subscription can be evaluated multiple times within the scope to take a message from the `PubSub` each time.

subscribe
(
(parameter) pubsub: PubSub.PubSub<string>
pubsub
)
8
const
const dequeue2: Queue.Dequeue<string>
dequeue2
= yield*
import PubSub
PubSub
.
const subscribe: <string>(self: PubSub.PubSub<string>) => Effect.Effect<Queue.Dequeue<string>, never, Scope>

Subscribes to receive messages from the `PubSub`. The resulting subscription can be evaluated multiple times within the scope to take a message from the `PubSub` each time.

subscribe
(
(parameter) pubsub: PubSub.PubSub<string>
pubsub
)
9
yield*
import PubSub
PubSub
.
const publish: <string>(self: PubSub.PubSub<string>, value: string) => Effect.Effect<boolean> (+1 overload)

Publishes a message to the `PubSub`, returning whether the message was published to the `PubSub`.

publish
(
(parameter) pubsub: PubSub.PubSub<string>
pubsub
, "Hello from a PubSub!")
10
yield*
import Queue
Queue
.
const take: <string>(self: Queue.Dequeue<string>) => Effect.Effect<string, never, never>

Takes the oldest value in the queue. If the queue is empty, this will return a computation that resumes when an item has been added to the queue.

take
(
const dequeue1: Queue.Dequeue<string>
dequeue1
).
(method) Pipeable.pipe<Effect.Effect<string, never, never>, Effect.Effect<void, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<string, never, never>) => Effect.Effect<void, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
import Effect
Effect
.
const andThen: <string, Effect.Effect<void, never, never>>(f: (a: string) => Effect.Effect<void, never, never>) => <E, R>(self: Effect.Effect<string, E, R>) => Effect.Effect<...> (+3 overloads)

Executes a sequence of two actions, typically two `Effect`s, where the second action can depend on the result of the first action. The `that` action can take various forms: - a value - a function returning a value - a promise - a function returning a promise - an effect - a function returning an effect

andThen
(
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
))
11
yield*
import Queue
Queue
.
const take: <string>(self: Queue.Dequeue<string>) => Effect.Effect<string, never, never>

Takes the oldest value in the queue. If the queue is empty, this will return a computation that resumes when an item has been added to the queue.

take
(
const dequeue2: Queue.Dequeue<string>
dequeue2
).
(method) Pipeable.pipe<Effect.Effect<string, never, never>, Effect.Effect<void, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<string, never, never>) => Effect.Effect<void, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
import Effect
Effect
.
const andThen: <string, Effect.Effect<void, never, never>>(f: (a: string) => Effect.Effect<void, never, never>) => <E, R>(self: Effect.Effect<string, E, R>) => Effect.Effect<...> (+3 overloads)

Executes a sequence of two actions, typically two `Effect`s, where the second action can depend on the result of the first action. The `that` action can take various forms: - a value - a function returning a value - a promise - a function returning a promise - an effect - a function returning an effect

andThen
(
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
))
12
})
13
)
14
)
15
)
16
17
import Effect
Effect
.
const runPromise: <void, never>(effect: Effect.Effect<void, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<void>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const program: Effect.Effect<void, never, never>
program
)
18
/*
19
Output:
20
Hello from a PubSub!
21
Hello from a PubSub!
22
*/

It’s important to note that a subscriber will only receive messages published to the pubsub while it’s subscribed. To ensure that a specific message reaches a subscriber, make sure that the subscription has been established before publishing the message.

You can create a pubsub using various constructors provided by the PubSub module:

A bounded pubsub applies back pressure to publishers when it’s at capacity, meaning publishers will block if the pubsub is full.

1
import {
import PubSub
PubSub
} from "effect"
2
3
const
const boundedPubSub: Effect<PubSub.PubSub<string>, never, never>
boundedPubSub
=
import PubSub
PubSub
.
const bounded: <string>(capacity: number | { readonly capacity: number; readonly replay?: number | undefined; }) => Effect<PubSub.PubSub<string>, never, never>

Creates a bounded `PubSub` with the back pressure strategy. The `PubSub` will retain messages until they have been taken by all subscribers, applying back pressure to publishers if the `PubSub` is at capacity. For best performance use capacities that are powers of two.

bounded
<string>(2)

Back pressure ensures that all subscribers receive all messages while they are subscribed. However, it can lead to slower message delivery if a subscriber is slow.

A dropping pubsub simply discards values if it’s full. The PubSub.publish function will return false when the pubsub is full.

1
import {
import PubSub
PubSub
} from "effect"
2
3
const
const droppingPubSub: Effect<PubSub.PubSub<string>, never, never>
droppingPubSub
=
import PubSub
PubSub
.
const dropping: <string>(capacity: number | { readonly capacity: number; readonly replay?: number | undefined; }) => Effect<PubSub.PubSub<string>, never, never>

Creates a bounded `PubSub` with the dropping strategy. The `PubSub` will drop new messages if the `PubSub` is at capacity. For best performance use capacities that are powers of two.

dropping
<string>(2)

In a dropping pubsub, publishers can continue to publish new values, but subscribers are not guaranteed to receive all messages.

A sliding pubsub drops the oldest value when it’s full, ensuring that publishing always succeeds immediately.

1
import {
import PubSub
PubSub
} from "effect"
2
3
const
const slidingPubSub: Effect<PubSub.PubSub<string>, never, never>
slidingPubSub
=
import PubSub
PubSub
.
const sliding: <string>(capacity: number | { readonly capacity: number; readonly replay?: number | undefined; }) => Effect<PubSub.PubSub<string>, never, never>

Creates a bounded `PubSub` with the sliding strategy. The `PubSub` will add new messages and drop old messages if the `PubSub` is at capacity. For best performance use capacities that are powers of two.

sliding
<string>(2)

A sliding pubsub prevents slow subscribers from impacting the message delivery rate. However, there’s still a risk that slow subscribers may miss some messages.

An unbounded pubsub can never be full, and publishing always succeeds immediately.

1
import {
import PubSub
PubSub
} from "effect"
2
3
const
const unboundedPubSub: Effect<PubSub.PubSub<string>, never, never>
unboundedPubSub
=
import PubSub
PubSub
.
const unbounded: <string>(options?: { readonly replay?: number | undefined; }) => Effect<PubSub.PubSub<string>, never, never>

Creates an unbounded `PubSub`.

unbounded
<string>()

Unbounded pubsubs guarantee that all subscribers receive all messages without slowing down message delivery. However, they can grow indefinitely if messages are published faster than they are consumed.

Generally, it’s recommended to use bounded, dropping, or sliding pubsubs unless you have specific use cases for unbounded pubsubs.

PubSubs support various operations similar to those available on queues.

You can use the PubSub.publishAll operator to publish multiple values to the pubsub at once:

1
import {
import Effect
Effect
,
import PubSub
PubSub
,
import Queue
Queue
,
import Console
Console
} from "effect"
2
3
const
const program: Effect.Effect<void, never, never>
program
=
import PubSub
PubSub
.
const bounded: <string>(capacity: number | { readonly capacity: number; readonly replay?: number | undefined; }) => Effect.Effect<PubSub.PubSub<string>, never, never>

Creates a bounded `PubSub` with the back pressure strategy. The `PubSub` will retain messages until they have been taken by all subscribers, applying back pressure to publishers if the `PubSub` is at capacity. For best performance use capacities that are powers of two.

bounded
<string>(2).
(method) Pipeable.pipe<Effect.Effect<PubSub.PubSub<string>, never, never>, Effect.Effect<void, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<PubSub.PubSub<string>, never, never>) => Effect.Effect<...>): Effect.Effect<...> (+21 overloads)
pipe
(
4
import Effect
Effect
.
const andThen: <PubSub.PubSub<string>, Effect.Effect<void, never, never>>(f: (a: PubSub.PubSub<string>) => Effect.Effect<void, never, never>) => <E, R>(self: Effect.Effect<...>) => Effect.Effect<...> (+3 overloads)

Executes a sequence of two actions, typically two `Effect`s, where the second action can depend on the result of the first action. The `that` action can take various forms: - a value - a function returning a value - a promise - a function returning a promise - an effect - a function returning an effect

andThen
((
(parameter) pubsub: PubSub.PubSub<string>
pubsub
) =>
5
import Effect
Effect
.
const scoped: <void, never, Scope>(effect: Effect.Effect<void, never, Scope>) => Effect.Effect<void, never, never>

Scopes all resources used in this workflow to the lifetime of the workflow, ensuring that their finalizers are run as soon as this workflow completes execution, whether by success, failure, or interruption.

scoped
(
6
import Effect
Effect
.
const gen: <YieldWrap<Effect.Effect<Queue.Dequeue<string>, never, Scope>> | YieldWrap<Effect.Effect<void, never, never>>, void>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen
(function* () {
7
const
const dequeue: Queue.Dequeue<string>
dequeue
= yield*
import PubSub
PubSub
.
const subscribe: <string>(self: PubSub.PubSub<string>) => Effect.Effect<Queue.Dequeue<string>, never, Scope>

Subscribes to receive messages from the `PubSub`. The resulting subscription can be evaluated multiple times within the scope to take a message from the `PubSub` each time.

subscribe
(
(parameter) pubsub: PubSub.PubSub<string>
pubsub
)
8
yield*
import PubSub
PubSub
.
const publishAll: <string>(self: PubSub.PubSub<string>, elements: Iterable<string>) => Effect.Effect<boolean> (+1 overload)

Publishes all of the specified messages to the `PubSub`, returning whether they were published to the `PubSub`.

publishAll
(
(parameter) pubsub: PubSub.PubSub<string>
pubsub
, ["Message 1", "Message 2"])
9
yield*
import Queue
Queue
.
const takeAll: <string>(self: Queue.Dequeue<string>) => Effect.Effect<Chunk<string>, never, never>

Takes all the values in the queue and returns the values. If the queue is empty returns an empty collection.

takeAll
(
const dequeue: Queue.Dequeue<string>
dequeue
).
(method) Pipeable.pipe<Effect.Effect<Chunk<string>, never, never>, Effect.Effect<void, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<Chunk<string>, never, never>) => Effect.Effect<...>): Effect.Effect<...> (+21 overloads)
pipe
(
import Effect
Effect
.
const andThen: <Chunk<string>, Effect.Effect<void, never, never>>(f: (a: Chunk<string>) => Effect.Effect<void, never, never>) => <E, R>(self: Effect.Effect<Chunk<string>, E, R>) => Effect.Effect<...> (+3 overloads)

Executes a sequence of two actions, typically two `Effect`s, where the second action can depend on the result of the first action. The `that` action can take various forms: - a value - a function returning a value - a promise - a function returning a promise - an effect - a function returning an effect

andThen
(
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
))
10
})
11
)
12
)
13
)
14
15
import Effect
Effect
.
const runPromise: <void, never>(effect: Effect.Effect<void, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<void>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const program: Effect.Effect<void, never, never>
program
)
16
/*
17
Output:
18
{
19
_id: "Chunk",
20
values: [ "Message 1", "Message 2" ]
21
}
22
*/

You can determine the capacity and current size of the pubsub using PubSub.capacity and PubSub.size:

1
import {
import Effect
Effect
,
import PubSub
PubSub
,
import Console
Console
} from "effect"
2
3
const
const program: Effect.Effect<PubSub.PubSub<number>, never, never>
program
=
import PubSub
PubSub
.
const bounded: <number>(capacity: number | { readonly capacity: number; readonly replay?: number | undefined; }) => Effect.Effect<PubSub.PubSub<number>, never, never>

Creates a bounded `PubSub` with the back pressure strategy. The `PubSub` will retain messages until they have been taken by all subscribers, applying back pressure to publishers if the `PubSub` is at capacity. For best performance use capacities that are powers of two.

bounded
<number>(2).
(method) Pipeable.pipe<Effect.Effect<PubSub.PubSub<number>, never, never>, Effect.Effect<PubSub.PubSub<number>, never, never>, Effect.Effect<PubSub.PubSub<number>, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<...>) => Effect.Effect<...>, bc: (_: Effect.Effect<...>) => Effect.Effect<...>): Effect.Effect<...> (+21 overloads)
pipe
(
4
import Effect
Effect
.
const tap: <PubSub.PubSub<number>, Effect.Effect<void, never, never>>(f: (a: PubSub.PubSub<number>) => Effect.Effect<void, never, never>) => <E, R>(self: Effect.Effect<...>) => Effect.Effect<...> (+7 overloads)
tap
((
(parameter) pubsub: PubSub.PubSub<number>
pubsub
) =>
5
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
(`capacity: ${
import PubSub
PubSub
.
const capacity: <number>(self: PubSub.PubSub<number>) => number

Returns the number of elements the queue can hold.

capacity
(
(parameter) pubsub: PubSub.PubSub<number>
pubsub
)}`)
6
),
7
import Effect
Effect
.
const tap: <PubSub.PubSub<number>, Effect.Effect<void, never, never>>(f: (a: PubSub.PubSub<number>) => Effect.Effect<void, never, never>) => <E, R>(self: Effect.Effect<...>) => Effect.Effect<...> (+7 overloads)
tap
((
(parameter) pubsub: PubSub.PubSub<number>
pubsub
) =>
8
import PubSub
PubSub
.
const size: <number>(self: PubSub.PubSub<number>) => Effect.Effect<number>

Retrieves the size of the queue, which is equal to the number of elements in the queue. This may be negative if fibers are suspended waiting for elements to be added to the queue.

size
(
(parameter) pubsub: PubSub.PubSub<number>
pubsub
).
(method) Pipeable.pipe<Effect.Effect<number, never, never>, Effect.Effect<void, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<number, never, never>) => Effect.Effect<void, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
9
import Effect
Effect
.
const andThen: <number, Effect.Effect<void, never, never>>(f: (a: number) => Effect.Effect<void, never, never>) => <E, R>(self: Effect.Effect<number, E, R>) => Effect.Effect<...> (+3 overloads)

Executes a sequence of two actions, typically two `Effect`s, where the second action can depend on the result of the first action. The `that` action can take various forms: - a value - a function returning a value - a promise - a function returning a promise - an effect - a function returning an effect

andThen
((
(parameter) size: number
size
) =>
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>
log
(`size: ${
(parameter) size: number
size
}`))
10
)
11
)
12
)
13
14
import Effect
Effect
.
const runPromise: <PubSub.PubSub<number>, never>(effect: Effect.Effect<PubSub.PubSub<number>, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<...>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const program: Effect.Effect<PubSub.PubSub<number>, never, never>
program
)
15
/*
16
Output:
17
capacity: 2
18
size: 0
19
*/

Note that capacity returns a number because the capacity is set at pubsub creation and never changes. In contrast, size returns an effect that determines the current size of the pubsub since the number of messages in the pubsub can change over time.

You can shut down a pubsub using PubSub.shutdown, check if it’s shut down with PubSub.isShutdown, or await its shutdown with PubSub.awaitShutdown. Shutting down a pubsub also shuts down all associated queues, ensuring the proper propagation of the shutdown signal.

As you can see, the operators on PubSub are identical to the ones on Queue with the exception of PubSub.publish and PubSub.subscribe replacing Queue.offer and Queue.take. So if you know how to use a Queue, you already know how to use a PubSub.

In fact, a PubSub can be viewed as a Queue that can only be written to:

interface PubSub<A> extends Queue.Enqueue<A> {}

Here, the Enqueue type represents a queue that can only be enqueued. Enqueuing to the queue publishes a value to the pubsub, and actions like shutting down the queue also shut down the pubsub.

This versatility allows you to use a PubSub wherever you currently use a Queue that you only write to.