Skip to content

Semaphore

A semaphore, in the context of programming, is a synchronization mechanism that allows you to control access to a shared resource. In Effect, semaphores are used to manage access to resources or coordinate tasks in an asynchronous and concurrent environment. Let’s dive into the concept of semaphores and how they work in Effect.

A semaphore is a generalization of a mutex. It has a certain number of permits, which can be held and released concurrently by different parties. Think of permits as tickets that allow entities (e.g., tasks or fibers) to access a shared resource or perform a specific operation. If there are no permits available and an entity tries to acquire one, it will be suspended until a permit becomes available.

Let’s take a look at an example using asynchronous tasks:

1
import {
import Effect
Effect
} from "effect"
2
3
const
const task: Effect.Effect<void, never, never>
task
=
import Effect
Effect
.
const gen: <YieldWrap<Effect.Effect<void, never, never>>, void>(f: (resume: Effect.Adapter) => Generator<YieldWrap<Effect.Effect<void, never, never>>, void, never>) => Effect.Effect<...> (+1 overload)
gen
(function* () {
4
yield*
import Effect
Effect
.
const log: (...message: ReadonlyArray<any>) => Effect.Effect<void, never, never>

Logs one or more messages or error causes at the current log level, which is INFO by default. This function allows logging multiple items at once and can include detailed error information using `Cause` instances. To adjust the log level, use the `Logger.withMinimumLogLevel` function.

log
("start")
5
yield*
import Effect
Effect
.
const sleep: (duration: DurationInput) => Effect.Effect<void>

Returns an effect that suspends for the specified duration. This method is asynchronous, and does not actually block the fiber executing the effect.

sleep
("2 seconds")
6
yield*
import Effect
Effect
.
const log: (...message: ReadonlyArray<any>) => Effect.Effect<void, never, never>

Logs one or more messages or error causes at the current log level, which is INFO by default. This function allows logging multiple items at once and can include detailed error information using `Cause` instances. To adjust the log level, use the `Logger.withMinimumLogLevel` function.

log
("end")
7
})
8
9
const
const semTask: (sem: Effect.Semaphore) => Effect.Effect<void, never, never>
semTask
= (
(parameter) sem: Effect.Semaphore
sem
:
import Effect
Effect
.
interface Semaphore
Semaphore
) =>
(parameter) sem: Effect.Semaphore
sem
.
(method) Semaphore.withPermits(permits: number): <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>

when the given amount of permits are available, run the effect and release the permits when finished

withPermits
(1)(
const task: Effect.Effect<void, never, never>
task
)
10
11
const
const semTaskSeq: (sem: Effect.Semaphore) => Effect.Effect<void, never, never>[]
semTaskSeq
= (
(parameter) sem: Effect.Semaphore
sem
:
import Effect
Effect
.
interface Semaphore
Semaphore
) =>
12
[1, 2, 3].
(method) Array<number>.map<Effect.Effect<void, never, never>>(callbackfn: (value: number, index: number, array: number[]) => Effect.Effect<void, never, never>, thisArg?: any): Effect.Effect<void, never, never>[]

Calls a defined callback function on each element of an array, and returns an array that contains the results.

map
(() =>
const semTask: (sem: Effect.Semaphore) => Effect.Effect<void, never, never>
semTask
(
(parameter) sem: Effect.Semaphore
sem
).
(method) Pipeable.pipe<Effect.Effect<void, never, never>, Effect.Effect<void, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<void, never, never>) => Effect.Effect<void, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
import Effect
Effect
.
const withLogSpan: (label: string) => <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R> (+1 overload)

Adds a log span to your effects, which tracks and logs the duration of operations or tasks. This is useful for performance monitoring and debugging time-sensitive processes.

withLogSpan
("elapsed")))
13
14
const
const program: Effect.Effect<void, never, never>
program
=
import Effect
Effect
.
const gen: <YieldWrap<Effect.Effect<Effect.Semaphore, never, never>> | YieldWrap<Effect.Effect<void[], never, never>>, void>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen
(function* () {
15
const
const mutex: Effect.Semaphore
mutex
= yield*
import Effect
Effect
.
const makeSemaphore: (permits: number) => Effect.Effect<Effect.Semaphore>

Creates a new Semaphore

makeSemaphore
(1)
16
yield*
import Effect
Effect
.
const all: <Effect.Effect<void, never, never>[], { concurrency: "unbounded"; }>(arg: Effect.Effect<void, never, never>[], options?: { concurrency: "unbounded"; } | undefined) => Effect.Effect<...>

Runs all the provided effects in sequence respecting the structure provided in input. Supports multiple arguments, a single argument tuple / array or record / struct.

all
(
const semTaskSeq: (sem: Effect.Semaphore) => Effect.Effect<void, never, never>[]
semTaskSeq
(
const mutex: Effect.Semaphore
mutex
), {
(property) concurrency: "unbounded"
concurrency
: "unbounded" })
17
})
18
19
import Effect
Effect
.
const runPromise: <void, never>(effect: Effect.Effect<void, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<void>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const program: Effect.Effect<void, never, never>
program
)
20
/*
21
Output:
22
timestamp=... level=INFO fiber=#1 message=start elapsed=3ms
23
timestamp=... level=INFO fiber=#1 message=end elapsed=2010ms
24
timestamp=... level=INFO fiber=#2 message=start elapsed=2012ms
25
timestamp=... level=INFO fiber=#2 message=end elapsed=4017ms
26
timestamp=... level=INFO fiber=#3 message=start elapsed=4018ms
27
timestamp=... level=INFO fiber=#3 message=end elapsed=6026ms
28
*/

Here, we synchronize and control the execution of asynchronous tasks using a semaphore with one permit. When all permits are in use, additional tasks attempting to acquire permits will wait until some become available.

In another scenario, we create a semaphore with five permits. We then utilize withPermits(n) to acquire and release varying numbers of permits for each task:

1
import {
import Effect
Effect
} from "effect"
2
3
const
const program: Effect.Effect<void, never, never>
program
=
import Effect
Effect
.
const gen: <YieldWrap<Effect.Effect<Effect.Semaphore, never, never>> | YieldWrap<Effect.Effect<void[], never, never>>, void>(f: (resume: Effect.Adapter) => Generator<...>) => Effect.Effect<...> (+1 overload)
gen
(function* () {
4
const
const sem: Effect.Semaphore
sem
= yield*
import Effect
Effect
.
const makeSemaphore: (permits: number) => Effect.Effect<Effect.Semaphore>

Creates a new Semaphore

makeSemaphore
(5)
5
6
yield*
import Effect
Effect
.
const forEach: <void, never, never, number[]>(self: number[], f: (a: number, i: number) => Effect.Effect<void, never, never>, options?: { readonly concurrency?: Concurrency | undefined; readonly batching?: boolean | "inherit" | undefined; readonly discard?: false | undefined; readonly concurrentFinalizers?: boolean | undefined; } | undefined) => Effect.Effect<...> (+3 overloads)
forEach
(
7
[1, 2, 3, 4, 5],
8
(
(parameter) n: number
n
) =>
9
const sem: Effect.Semaphore
sem
10
.
(method) Semaphore.withPermits(permits: number): <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>

when the given amount of permits are available, run the effect and release the permits when finished

withPermits
(
(parameter) n: number
n
)(
11
import Effect
Effect
.
const delay: <void, never, never>(self: Effect.Effect<void, never, never>, duration: DurationInput) => Effect.Effect<void, never, never> (+1 overload)

Returns an effect that is delayed from this effect by the specified `Duration`.

delay
(
import Effect
Effect
.
const log: (...message: ReadonlyArray<any>) => Effect.Effect<void, never, never>

Logs one or more messages or error causes at the current log level, which is INFO by default. This function allows logging multiple items at once and can include detailed error information using `Cause` instances. To adjust the log level, use the `Logger.withMinimumLogLevel` function.

log
(`process: ${
(parameter) n: number
n
}`), "2 seconds")
12
)
13
.
(method) Pipeable.pipe<Effect.Effect<void, never, never>, Effect.Effect<void, never, never>>(this: Effect.Effect<...>, ab: (_: Effect.Effect<void, never, never>) => Effect.Effect<void, never, never>): Effect.Effect<...> (+21 overloads)
pipe
(
import Effect
Effect
.
const withLogSpan: (label: string) => <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R> (+1 overload)

Adds a log span to your effects, which tracks and logs the duration of operations or tasks. This is useful for performance monitoring and debugging time-sensitive processes.

withLogSpan
("elapsed")),
14
{
(property) concurrency?: Concurrency | undefined
concurrency
: "unbounded" }
15
)
16
})
17
18
import Effect
Effect
.
const runPromise: <void, never>(effect: Effect.Effect<void, never, never>, options?: { readonly signal?: AbortSignal; } | undefined) => Promise<void>

Runs an `Effect` workflow, returning a `Promise` which resolves with the result of the workflow or rejects with an error.

runPromise
(
const program: Effect.Effect<void, never, never>
program
)
19
/*
20
Output:
21
timestamp=... level=INFO fiber=#1 message="process: 1" elapsed=2011ms
22
timestamp=... level=INFO fiber=#2 message="process: 2" elapsed=2017ms
23
timestamp=... level=INFO fiber=#3 message="process: 3" elapsed=4020ms
24
timestamp=... level=INFO fiber=#4 message="process: 4" elapsed=6025ms
25
timestamp=... level=INFO fiber=#5 message="process: 5" elapsed=8034ms
26
*/

In this example, we show that you can acquire and release any number of permits with withPermits(n). This flexibility allows for precise control over concurrency.

One crucial aspect to remember is that withPermits ensures that each acquisition is matched with an equivalent number of releases, regardless of whether the task succeeds, fails, or gets interrupted.