Up until now, all code examples in the series were centered around synchronous computation. In reality, we need to be able to combine both synchronous and asynchronous computations. Some examples of async functions are: database lookups, HTTP calls, reading data from the filesystem, etc. In this post, we take a look at how such functions can be composed.
The pitfalls of working with Promises
Promise
is JavaScript's native language construct representing the result of an async operation[1]. However, it may be quite inconvenient to work with for multiple reasons.
First of all, once we instantiate a promise, it starts evaluating immediately. Canceling one is non-trivial, so it is best not to instantiate a promise unless absolutely necessary. This requires us to have a tight control over the order of execution of async functions. This may be difficult to achieve in large and complex codebases. However, this control is crucial when it comes to fixing pesky bugs or optimizing for performance or resource utilization.
Secondly, as seen from its type signature (Promise<A>
), a promise only communicates the type of its resolved value. However, it can also reject with any value. This requires us to handle unknown edge cases of an any
type which is basically giving up on type safety. However, since TypeScript 4.4 we can use the useUnknownInCatchVariables
compiler option to get type-safety back at the cost of reduced ergonomics:
class SomeCustomError extends Error {}
try {
// call something that may throw
} catch (err: unknown) {
if (err instanceof SomeCustomError) {
// handle some custom error
} else if (err instanceof Error) {
// handle a generic error
} else {
throw new Error(String(err));
}
}
Lastly, the type system does not have a mechanism to enforce the handling of possibly rejected promises. Basically, whether we wrap some code in a try-catch
block or not it still produces a valid program. This contributes to bugs caused by human error. People may simply forget to handle errors.
Tasks to the rescue
Although more complex, Task monads help us deal with the shortcomings of Promise
. fp-ts
provides the following four task monads:
-
Task<A>
: represents an async computation that never fails. -
TaskEither<E,A>
: represents an async computation that may fail. -
TaskOption<A>
: represents an async computation that never fails and may or may not return a value. -
TaskThese<E,A>
: represents an async computation that may fail, succeed, or do both at the same time.
The main advantages of tasks as opposed to using only promises can be summarized as:
- We retain control over when tasks are executed. Hence, it becomes easier to fix bugs and optimize our code.
- We can clearly see from the type signature what effects a task has. This not only saves us time learning about possible edge cases to handle but the compiler actually forces us to handle them. This helps us write more correct business logic. This is fairly similar to the advantages of using the Either monad for synchronous computations that may fail.
This post only describes the Task
and TaskEither
monads in more detail. Understanding these two is enough to understand the other ones.
Task
A task describes an asynchronous computation that never fails. It is defined as follows [2]:
export interface Task<A> {
(): Promise<A>;
}
It is a thunk - basically a lazily executed Promise. Here's an example of an async function that cannot fail:
import * as fs from "node:fs/promises";
export const safeReadFile =
(fallback: string) =>
async (path: string): Promise<string> => {
try {
return fs.readFile(path, "utf8");
} catch (_err) {
return fallback;
}
};
safeReadFile
wraps the native promise-based readFile
function and handles any error by returning a configurable string as the fallback value. Hence, calling safeReadFile
never fails.
Constructors
The simplest way to construct a task is by using the of
function:
import { task } from "fp-ts";
export const myTask = task.of(42);
// Value of myTask is () => Promise.resolve(42)
// Type of myTask is Task<number>
However, the most common way to construct one is to just return an async thunk like so:
import { task } from "fp-ts";
import * as fs from "node:fs/promises";
export const safeReadFile =
(fallback: string) =>
(path: string): task.Task<string> =>
async () => {
try {
return fs.readFile(path, "utf8");
} catch (_err) {
return fallback;
}
};
To get the value wrapped by the task, it needs to be first eliminated by calling the task. After that, its result needs to be awaited in an async context like so:
import { task } from "fp-ts";
import assert from "node:assert";
const myTask = task.of(42);
(async () => {
const value = await myTask();
assert.equal(value, 42);
// OK, 42 == 42
})();
TaskEither
A TaskEither describes an async computation that may fail. It is defined as follows [3]:
import type { either, task } from "fp-ts";
export interface TaskEither<E, A> extends task.Task<either.Either<E, A>> {}
We can think of the TaskEither
monad as a syntactic sugar over the Task
monad that wraps an Either
. Task<Either<E, A>>
would be fairly inconvenient to work with. The map, chain, fold, etc. functions of the Task monad would only unwrap the Task instance. Unwrapping the instance of an Either would be the caller's responsibility. This can be demonstrated by the following code example.
import { either, task } from "fp-ts";
import { pipe } from "fp-ts/lib/function.js";
import { db } from "./db.js";
const query =
(sql: string): task.Task<either.Either<Error, unknown>> =>
async () => {
try {
const res = await db.query(sql);
return either.right(res);
} catch (error) {
return pipe(error, either.toError, either.left);
}
};
export const testDbConnection: task.Task<boolean> = pipe(
query("SELECT 1"),
task.map(
// unwrapping the task here
either.fold(
// unwrapping the either here
() => false,
() => true
)
)
);
Assume we implemented a db
module that knows how to authenticate against the database and fetch a connection. The above code implements query
as a functional wrapper over querying the database that throws an error in case of connection issues, syntax errors in the SQL statement, etc. It also implements testDbConnection
which returns true
if the test query succeeds and false
otherwise. It is common to see implementations like this for diagnostic endpoints that are regularly called by an internal service for monitoring purposes. Using the TaskEither
monad simplifies our example from above:
import { either, task, taskEither } from "fp-ts";
import { pipe } from "fp-ts/lib/function.js";
import { db } from "./db.js";
const query = (sql: string): taskEither.TaskEither<Error, unknown> =>
taskEither.tryCatch(() => db.query(sql), either.toError);
export const testDbConnection: task.Task<boolean> = pipe(
query("SELECT 1"),
taskEither.fold(
() => task.of(false),
() => task.of(true)
)
);
The query
function becomes less verbose. taskEither.tryCatch
handles the wrapping of the success/failure values in their respective instances of Either
for us. It also reduces the level of nesting in testDbConection
by one. Notice, the functions passed to fold
must return a Task
. That is because once we operate on an asynchronous computation, we cannot turn it into a synchronous one.
Constructors
The simplest way to construct a TaskEither is by using the right
and left
functions:
import { taskEither } from "fp-ts";
export const asyncSuccess = taskEither.right(42);
// The value is () => Promise.resolve({ _tag: "Right", value: 42 })
// The type is TaskEither<never, number>
export const asyncFailure = taskEither.left(new Error("Something went wrong"));
// The value is () => Promise.reject({ _tag: "Left", value: new Error("Something went wrong" }))
// The type is TaskEither<Error, never>
Another way to construct a TaskEither is to return an async thunk that returns an Either:
import { either } from "fp-ts";
import * as fs from "node:fs/promises";
export const safeReadFile = (path: string) => async () => {
try {
return either.right(fs.readFile(path, "utf8"));
} catch (reason) {
return either.left(either.toError(reason));
}
};
Last but not least, we can use taskEither.tryCatch
to wrap an async function that may throw an error:
import { either, taskEither } from "fp-ts";
import * as fs from "node:fs/promises";
export const safeReadFile = (path: string) =>
taskEither.tryCatch(
() => fs.readFile(path, "utf8"),
either.toError
);
Just like in the case of the Option monad, tryCatch
is most useful when wrapping third-party library or native functions written in a non-functional style.
Conversions
It is possible to convert one Task monad to another. The following table shows how to perform a couple of such conversions:
From | To | Converter |
---|---|---|
Either | TaskEither | taskEither.fromEither |
Task | TaskEither | taskEither.fromTask |
TaskOption | TaskEither | taskEither.fromTaskOption |
TaskEither | Task | taskEither.fold |
Either | TaskOption | taskOption.fromEither |
Task | TaskOption | taskOption.fromTask |
TaskEither | TaskOption | taskOption.fromTaskEither |
TaskOption | Task | taskOption.fold |
Practical code example
Assume we worked on a software for tracking issues and the codebase follows the onion architecture. The core layer represents the different domains and implements functions that store and retrieve data in a database, emit Kafka messages, etc. The layer above that represent the repositories that compose the different functions of the domain layer to implement the business logic. The last layer is an API layer that exposes all the functionality to the clients of the service. It could consist of REST endpoints, GraphQL resolvers, workflow orchestrators, and console roles implementing REPL-like debugging interfaces in production, etc.
In this example, we only describe the two innermost layers of the architecture. Assume a project domain that implements the following data structures and functions:
// domains/project.ts
import type { either, taskEither } from "fp-ts";
import type * as db from "./db.js";
import type * as kafka from "./kafka.js";
export interface Project {
id: string;
name: string;
description: string;
organizationId: string;
}
export type ProjectInput = Pick<
Project,
"name" | "description" | "organizationId"
>;
export interface ParseInputError {
_tag: "ParseInputError";
value: Error;
}
export interface ProjectNameUnavailableError {
_tag: "ProjectNameUnavailableError";
value: Error;
}
export type ValidateAvailabilityError =
| ProjectNameUnavailableError
| db.QueryError;
export interface ProjectLimitReachedError {
_tag: "ProjectLimitReachedError";
value: Error;
}
export type EnforceLimitError = ProjectLimitReachedError | db.QueryError;
export interface MessageEncodingError {
_tag: "MessageEncodingError";
value: Error;
}
export type EmitEntityError = MessageEncodingError | kafka.ProducerError;
/**
* A synchronous operation that accepts an unknown object
* and parses it. The result is an `Either`.
*/
export type ParseInput = (
input: unknown
) => either.Either<ParseInputError, ProjectInput>;
export declare const parseInput: ParseInput;
/**
* A function that accepts an object representing
* the input data of a project and returns a task
* describing an asynchronous operation which queries
* the database to check whether the project name
* is still available. Project names across an organization
* must be unique. This operation may fail due to different
* reasons, such as network errors, database connection
* errors, SQL syntax errors, etc.
*/
export type ValidateAvailability = (
input: ProjectInput
) => taskEither.TaskEither<ValidateAvailabilityError, void>;
export declare const validateAvailability: ValidateAvailability;
/**
* A task describing an asynchronous operation that
* queries the database for the number of existing
* projects for a given organization. There is a
* product limit for how many projects can be created
* by an organization. This operation fails if the limit
* is reached or any other network or database error occurs.
*/
export type EnforceLimit = taskEither.TaskEither<EnforceLimitError, void>;
export declare const enforceLimit: EnforceLimit;
/**
* A function that accepts a project object and returns
* a task describing an asynchronous operation that
* persists this object in the database. This operation
* fails if any network or database error occurs.
*/
export type Create = (
project: Project
) => taskEither.TaskEither<db.QueryError, void>;
export declare const create: Create;
/**
* A function that accepts a project object and returns
* a task describing an asynchronous operation that encodes
* this object and produces a Kafka message. This operation
* fails if the encoding fails, or any other network or broker
* error occurs.
*/
export type EmitEntity = (
project: Project
) => taskEither.TaskEither<EmitEntityError, void>;
export declare const emitEntity: EmitEntity;
With the project domain implemented, we can demonstrate how the corresponding repository implements a create
action by composing the domain functions together:
// repositories/project.ts
import { taskEither } from "fp-ts";
import { flow } from "fp-ts/lib/function.js";
import { ulid } from "ulid";
import * as project from "../domains/project.js";
export const create = flow(
project.parseInput,
taskEither.fromEither,
taskEither.chainFirstW(project.validateAvailability),
taskEither.chainFirstW(() => project.enforceLimit),
taskEither.bind("id", () => taskEither.right(ulid())),
taskEither.chainFirstW(project.create),
taskEither.chainFirstW(project.emitEntity)
);
First, the input data is parsed. Next, we perform a small set of fail-fast validations. We start by checking if the project name is available. Since this validation is a task that may fail, we need to first lift the Either
produced by parseInput
to a TaskEither
. We use chainFirstW
for two main reasons:
- To call
validateAvailability
in the pipeline so that we can ignore its return value and keep on passing theProjectInput
object down the pipeline (flow). Note thatchainFirstW
unwraps thetaskEither
and passes its value down to the supplied function (validateAvailability). - To widen the failure type of
TaskEither
, thus extending the number of distinct errors that can cause the whole pipeline to fail.
The next validation is enforceLimit
which is composed using chainFirstW
for the same reasons. However, here we pass an anonymous function () => project.enforceLimit
to chainFirstW
. That is because enforceLimit
is a task as opposed to a function returning a task as it was the case with validateAvailability
.
Once all the validation is done, we generate a unique ID for the project and bind it to the ProjectInput
object under the key id
. This creates an object with the shape of the Project
interface and allows us to continue in the create pipeline of the project repository.
Next, we chainFirstW
the project.create
function to persist the project object in the database. At last, we chainFirstW
the project.emitEntity
function that produces a Kafka message to a compacted topic letting other microservices perform asynchronous data aggregation and replication as necessary.
If any of the steps in the pipeline fail, for whatever reason, the failure is propagated through the rest of the pipeline without executing any of the remaining tasks. The type of the create
action is widened to the following final type that documents all possible failure cases that an engineer would now be aware of and can be handled at the call site:
import type { taskEither } from "fp-ts";
import type * as project from "../domains/project.js";
import type * as db from "../db.js";
export interface Create {
(input: unknown): taskEither.TaskEither<
| project.ProjectLimitReachedError
| db.QueryError
| project.ParseInputError
| project.ProjectNameUnavailableError
| project.EmitEntityError,
project.Project
>;
}
Concurrency
We could optimize the code and perform the projects.create
and project.emitEntity
actions concurrently. To do that, we'll use taskEither.sequenceArray
which is an equivalent of Promise.all
:
// repositories/project.ts
import { taskEither } from "fp-ts";
import { flow } from "fp-ts/lib/function.js";
import { ulid } from "ulid";
import * as project from "./13-domains-project.js";
import type * as db from "./db.js";
const createAndEmitProject = (proj: project.Project) =>
taskEither.sequenceArray<void, db.QueryError | project.EmitEntityError>([
project.create(proj),
project.emitEntity(proj),
]);
export const create = flow(
project.parseInput,
taskEither.fromEither,
taskEither.chainFirstW(project.validateAvailability),
taskEither.chainFirstW(() => project.enforceLimit),
taskEither.bind("id", () => taskEither.right(ulid())),
taskEither.chainFirstW(createAndEmitProject)
);
We can create a helper function called createAndEmitProject
. Since projects.create
and projects.emitEntity
have different failure types, taskEither.sequenceArray
can't infer the resulting type properly. Thus, we have to override the success and failure types manually. With that done, we can simply replace the two calls to chainFirstW
with a single one in the create
pipeline.
The shortcomings of this example
Although this example looks fairly complete, it ignores several aspects of real life software engineering:
- The database operations may need to be performed in a single transaction to prevent data from entering an inconsistent state.
- These functions may rely on one or more dependencies such as a database client, a logger, a tracer, etc.
I hope to address these shortcomings in the next parts of this series once we learn about the Reader monads for managing dependencies.
Wrap-up
-
Task
describes an async computation that never fails. -
TaskEither
describes an async computation that may fail. -
TaskOption
describes an async computation that never fails and returns an Option. -
TaskThese
describes an async computation that may fail, succeed, or do both. - All tasks are thunks that return a
Promise
. - Tasks are more complex to work with but superior to promises in every other way:
- We retain better control over the execution of the underlying Promise of a task.
- Failures and edge cases are visible from the type signature.
- The type system forces us to handle the failure cases.
- Once we have a task, it stays a task until eliminated (by calling it and awaiting its result).
- It is easy to convert between instances of different types of task monads.
- To create a task that performs other tasks concurrently, we can use the
sequenceArray
monadic function.
The next post of this series delves into the Reader monads and how they can help us with dependency injection.
Top comments (0)