DEV Community

Attila Večerek
Attila Večerek

Posted on • Edited on

Task monads

Up until now, all code examples in the series were centered around synchronous computation. In reality, we need to be able to combine both synchronous and asynchronous computations. Some examples of async functions are: database lookups, HTTP calls, reading data from the filesystem, etc. In this post, we take a look at how such functions can be composed.

The pitfalls of working with Promises

Promise is JavaScript's native language construct representing the result of an async operation[1]. However, it may be quite inconvenient to work with for multiple reasons.

First of all, once we instantiate a promise, it starts evaluating immediately. Canceling one is non-trivial, so it is best not to instantiate a promise unless absolutely necessary. This requires us to have a tight control over the order of execution of async functions. This may be difficult to achieve in large and complex codebases. However, this control is crucial when it comes to fixing pesky bugs or optimizing for performance or resource utilization.

Secondly, as seen from its type signature (Promise<A>), a promise only communicates the type of its resolved value. However, it can also reject with any value. This requires us to handle unknown edge cases of an any type which is basically giving up on type safety. However, since TypeScript 4.4 we can use the useUnknownInCatchVariables compiler option to get type-safety back at the cost of reduced ergonomics:

class SomeCustomError extends Error {}

try {
  // call something that may throw
} catch (err: unknown) {
  if (err instanceof SomeCustomError) {
    // handle some custom error
  } else if (err instanceof Error) {
    // handle a generic error
  } else {
    throw new Error(String(err));
  }
}
Enter fullscreen mode Exit fullscreen mode

Lastly, the type system does not have a mechanism to enforce the handling of possibly rejected promises. Basically, whether we wrap some code in a try-catch block or not it still produces a valid program. This contributes to bugs caused by human error. People may simply forget to handle errors.

Tasks to the rescue

Although more complex, Task monads help us deal with the shortcomings of Promise. fp-ts provides the following four task monads:

  1. Task<A>: represents an async computation that never fails.
  2. TaskEither<E,A>: represents an async computation that may fail.
  3. TaskOption<A>: represents an async computation that never fails and may or may not return a value.
  4. TaskThese<E,A>: represents an async computation that may fail, succeed, or do both at the same time.

The main advantages of tasks as opposed to using only promises can be summarized as:

  • We retain control over when tasks are executed. Hence, it becomes easier to fix bugs and optimize our code.
  • We can clearly see from the type signature what effects a task has. This not only saves us time learning about possible edge cases to handle but the compiler actually forces us to handle them. This helps us write more correct business logic. This is fairly similar to the advantages of using the Either monad for synchronous computations that may fail.

This post only describes the Task and TaskEither monads in more detail. Understanding these two is enough to understand the other ones.

Task

A task describes an asynchronous computation that never fails. It is defined as follows [2]:

export interface Task<A> {
  (): Promise<A>;
}
Enter fullscreen mode Exit fullscreen mode

It is a thunk - basically a lazily executed Promise. Here's an example of an async function that cannot fail:

import * as fs from "node:fs/promises";

export const safeReadFile =
  (fallback: string) =>
  async (path: string): Promise<string> => {
    try {
      return fs.readFile(path, "utf8");
    } catch (_err) {
      return fallback;
    }
  };
Enter fullscreen mode Exit fullscreen mode

safeReadFile wraps the native promise-based readFile function and handles any error by returning a configurable string as the fallback value. Hence, calling safeReadFile never fails.

Constructors

The simplest way to construct a task is by using the of function:

import { task } from "fp-ts";

export const myTask = task.of(42);
// Value of myTask is () => Promise.resolve(42)
// Type of myTask is Task<number>
Enter fullscreen mode Exit fullscreen mode

However, the most common way to construct one is to just return an async thunk like so:

import { task } from "fp-ts";
import * as fs from "node:fs/promises";

export const safeReadFile =
  (fallback: string) =>
  (path: string): task.Task<string> =>
  async () => {
    try {
      return fs.readFile(path, "utf8");
    } catch (_err) {
      return fallback;
    }
  };
Enter fullscreen mode Exit fullscreen mode

To get the value wrapped by the task, it needs to be first eliminated by calling the task. After that, its result needs to be awaited in an async context like so:

import { task } from "fp-ts";
import assert from "node:assert";

const myTask = task.of(42);

(async () => {
  const value = await myTask();
  assert.equal(value, 42);
  // OK, 42 == 42
})();
Enter fullscreen mode Exit fullscreen mode

TaskEither

A TaskEither describes an async computation that may fail. It is defined as follows [3]:

import type { either, task } from "fp-ts";

export interface TaskEither<E, A> extends task.Task<either.Either<E, A>> {}
Enter fullscreen mode Exit fullscreen mode

We can think of the TaskEither monad as a syntactic sugar over the Task monad that wraps an Either. Task<Either<E, A>> would be fairly inconvenient to work with. The map, chain, fold, etc. functions of the Task monad would only unwrap the Task instance. Unwrapping the instance of an Either would be the caller's responsibility. This can be demonstrated by the following code example.

import { either, task } from "fp-ts";
import { pipe } from "fp-ts/lib/function.js";
import { db } from "./db.js";

const query =
  (sql: string): task.Task<either.Either<Error, unknown>> =>
  async () => {
    try {
      const res = await db.query(sql);
      return either.right(res);
    } catch (error) {
      return pipe(error, either.toError, either.left);
    }
  };

export const testDbConnection: task.Task<boolean> = pipe(
  query("SELECT 1"),
  task.map(
    // unwrapping the task here
    either.fold(
      // unwrapping the either here
      () => false,
      () => true
    )
  )
);
Enter fullscreen mode Exit fullscreen mode

Assume we implemented a db module that knows how to authenticate against the database and fetch a connection. The above code implements query as a functional wrapper over querying the database that throws an error in case of connection issues, syntax errors in the SQL statement, etc. It also implements testDbConnection which returns true if the test query succeeds and false otherwise. It is common to see implementations like this for diagnostic endpoints that are regularly called by an internal service for monitoring purposes. Using the TaskEither monad simplifies our example from above:

import { either, task, taskEither } from "fp-ts";
import { pipe } from "fp-ts/lib/function.js";
import { db } from "./db.js";

const query = (sql: string): taskEither.TaskEither<Error, unknown> =>
  taskEither.tryCatch(() => db.query(sql), either.toError);

export const testDbConnection: task.Task<boolean> = pipe(
  query("SELECT 1"),
  taskEither.fold(
    () => task.of(false),
    () => task.of(true)
  )
);
Enter fullscreen mode Exit fullscreen mode

The query function becomes less verbose. taskEither.tryCatch handles the wrapping of the success/failure values in their respective instances of Either for us. It also reduces the level of nesting in testDbConection by one. Notice, the functions passed to fold must return a Task. That is because once we operate on an asynchronous computation, we cannot turn it into a synchronous one.

Constructors

The simplest way to construct a TaskEither is by using the right and left functions:

import { taskEither } from "fp-ts";

export const asyncSuccess = taskEither.right(42);
// The value is () => Promise.resolve({ _tag: "Right", value: 42 })
// The type is TaskEither<never, number>

export const asyncFailure = taskEither.left(new Error("Something went wrong"));
// The value is () => Promise.reject({ _tag: "Left", value: new Error("Something went wrong" }))
// The type is TaskEither<Error, never>
Enter fullscreen mode Exit fullscreen mode

Another way to construct a TaskEither is to return an async thunk that returns an Either:

import { either } from "fp-ts";
import * as fs from "node:fs/promises";

export const safeReadFile = (path: string) => async () => {
  try {
    return either.right(fs.readFile(path, "utf8"));
  } catch (reason) {
    return either.left(either.toError(reason));
  }
};
Enter fullscreen mode Exit fullscreen mode

Last but not least, we can use taskEither.tryCatch to wrap an async function that may throw an error:

import { either, taskEither } from "fp-ts";
import * as fs from "node:fs/promises";

export const safeReadFile = (path: string) =>
  taskEither.tryCatch(
    () => fs.readFile(path, "utf8"),
    either.toError
  );
Enter fullscreen mode Exit fullscreen mode

Just like in the case of the Option monad, tryCatch is most useful when wrapping third-party library or native functions written in a non-functional style.

Conversions

It is possible to convert one Task monad to another. The following table shows how to perform a couple of such conversions:

From To Converter
Either TaskEither taskEither.fromEither
Task TaskEither taskEither.fromTask
TaskOption TaskEither taskEither.fromTaskOption
TaskEither Task taskEither.fold
Either TaskOption taskOption.fromEither
Task TaskOption taskOption.fromTask
TaskEither TaskOption taskOption.fromTaskEither
TaskOption Task taskOption.fold

Practical code example

Assume we worked on a software for tracking issues and the codebase follows the onion architecture. The core layer represents the different domains and implements functions that store and retrieve data in a database, emit Kafka messages, etc. The layer above that represent the repositories that compose the different functions of the domain layer to implement the business logic. The last layer is an API layer that exposes all the functionality to the clients of the service. It could consist of REST endpoints, GraphQL resolvers, workflow orchestrators, and console roles implementing REPL-like debugging interfaces in production, etc.

A diagram visualizing the layers of the onion architecture

In this example, we only describe the two innermost layers of the architecture. Assume a project domain that implements the following data structures and functions:

// domains/project.ts

import type { either, taskEither } from "fp-ts";
import type * as db from "./db.js";
import type * as kafka from "./kafka.js";

export interface Project {
  id: string;
  name: string;
  description: string;
  organizationId: string;
}

export type ProjectInput = Pick<
  Project,
  "name" | "description" | "organizationId"
>;

export interface ParseInputError {
  _tag: "ParseInputError";
  value: Error;
}

export interface ProjectNameUnavailableError {
  _tag: "ProjectNameUnavailableError";
  value: Error;
}

export type ValidateAvailabilityError =
  | ProjectNameUnavailableError
  | db.QueryError;

export interface ProjectLimitReachedError {
  _tag: "ProjectLimitReachedError";
  value: Error;
}

export type EnforceLimitError = ProjectLimitReachedError | db.QueryError;

export interface MessageEncodingError {
  _tag: "MessageEncodingError";
  value: Error;
}

export type EmitEntityError = MessageEncodingError | kafka.ProducerError;

/**
 * A synchronous operation that accepts an unknown object
 * and parses it. The result is an `Either`.
 */
export type ParseInput = (
  input: unknown
) => either.Either<ParseInputError, ProjectInput>;
export declare const parseInput: ParseInput;

/**
 * A function that accepts an object representing
 * the input data of a project and returns a task
 * describing an asynchronous operation which queries
 * the database to check whether the project name
 * is still available. Project names across an organization
 * must be unique. This operation may fail due to different
 * reasons, such as network errors, database connection
 * errors, SQL syntax errors, etc.
 */
export type ValidateAvailability = (
  input: ProjectInput
) => taskEither.TaskEither<ValidateAvailabilityError, void>;
export declare const validateAvailability: ValidateAvailability;

/**
 * A task describing an asynchronous operation that
 * queries the database for the number of existing
 * projects for a given organization. There is a
 * product limit for how many projects can be created
 * by an organization. This operation fails if the limit
 * is reached or any other network or database error occurs.
 */
export type EnforceLimit = taskEither.TaskEither<EnforceLimitError, void>;
export declare const enforceLimit: EnforceLimit;

/**
 * A function that accepts a project object and returns
 * a task describing an asynchronous operation that
 * persists this object in the database. This operation
 * fails if any network or database error occurs.
 */
export type Create = (
  project: Project
) => taskEither.TaskEither<db.QueryError, void>;
export declare const create: Create;

/**
 * A function that accepts a project object and returns
 * a task describing an asynchronous operation that encodes
 * this object and produces a Kafka message. This operation
 * fails if the encoding fails, or any other network or broker
 * error occurs.
 */
export type EmitEntity = (
  project: Project
) => taskEither.TaskEither<EmitEntityError, void>;
export declare const emitEntity: EmitEntity;
Enter fullscreen mode Exit fullscreen mode

With the project domain implemented, we can demonstrate how the corresponding repository implements a create action by composing the domain functions together:

// repositories/project.ts

import { taskEither } from "fp-ts";
import { flow } from "fp-ts/lib/function.js";
import { ulid } from "ulid";
import * as project from "../domains/project.js";

export const create = flow(
  project.parseInput,
  taskEither.fromEither,
  taskEither.chainFirstW(project.validateAvailability),
  taskEither.chainFirstW(() => project.enforceLimit),
  taskEither.bind("id", () => taskEither.right(ulid())),
  taskEither.chainFirstW(project.create),
  taskEither.chainFirstW(project.emitEntity)
);
Enter fullscreen mode Exit fullscreen mode

First, the input data is parsed. Next, we perform a small set of fail-fast validations. We start by checking if the project name is available. Since this validation is a task that may fail, we need to first lift the Either produced by parseInput to a TaskEither. We use chainFirstW for two main reasons:

  1. To call validateAvailability in the pipeline so that we can ignore its return value and keep on passing the ProjectInput object down the pipeline (flow). Note that chainFirstW unwraps the taskEither and passes its value down to the supplied function (validateAvailability).
  2. To widen the failure type of TaskEither, thus extending the number of distinct errors that can cause the whole pipeline to fail.

The next validation is enforceLimit which is composed using chainFirstW for the same reasons. However, here we pass an anonymous function () => project.enforceLimit to chainFirstW. That is because enforceLimit is a task as opposed to a function returning a task as it was the case with validateAvailability.

Once all the validation is done, we generate a unique ID for the project and bind it to the ProjectInput object under the key id. This creates an object with the shape of the Project interface and allows us to continue in the create pipeline of the project repository.

Next, we chainFirstW the project.create function to persist the project object in the database. At last, we chainFirstW the project.emitEntity function that produces a Kafka message to a compacted topic letting other microservices perform asynchronous data aggregation and replication as necessary.

If any of the steps in the pipeline fail, for whatever reason, the failure is propagated through the rest of the pipeline without executing any of the remaining tasks. The type of the create action is widened to the following final type that documents all possible failure cases that an engineer would now be aware of and can be handled at the call site:

import type { taskEither } from "fp-ts";
import type * as project from "../domains/project.js";
import type * as db from "../db.js";

export interface Create {
  (input: unknown): taskEither.TaskEither<
    | project.ProjectLimitReachedError
    | db.QueryError
    | project.ParseInputError
    | project.ProjectNameUnavailableError
    | project.EmitEntityError,
    project.Project
  >;
}
Enter fullscreen mode Exit fullscreen mode

Concurrency

We could optimize the code and perform the projects.create and project.emitEntity actions concurrently. To do that, we'll use taskEither.sequenceArray which is an equivalent of Promise.all:

// repositories/project.ts

import { taskEither } from "fp-ts";
import { flow } from "fp-ts/lib/function.js";
import { ulid } from "ulid";
import * as project from "./13-domains-project.js";
import type * as db from "./db.js";

const createAndEmitProject = (proj: project.Project) =>
  taskEither.sequenceArray<void, db.QueryError | project.EmitEntityError>([
    project.create(proj),
    project.emitEntity(proj),
  ]);

export const create = flow(
  project.parseInput,
  taskEither.fromEither,
  taskEither.chainFirstW(project.validateAvailability),
  taskEither.chainFirstW(() => project.enforceLimit),
  taskEither.bind("id", () => taskEither.right(ulid())),
  taskEither.chainFirstW(createAndEmitProject)
);
Enter fullscreen mode Exit fullscreen mode

We can create a helper function called createAndEmitProject. Since projects.create and projects.emitEntity have different failure types, taskEither.sequenceArray can't infer the resulting type properly. Thus, we have to override the success and failure types manually. With that done, we can simply replace the two calls to chainFirstW with a single one in the create pipeline.

The shortcomings of this example

Although this example looks fairly complete, it ignores several aspects of real life software engineering:

  • The database operations may need to be performed in a single transaction to prevent data from entering an inconsistent state.
  • These functions may rely on one or more dependencies such as a database client, a logger, a tracer, etc.

I hope to address these shortcomings in the next parts of this series once we learn about the Reader monads for managing dependencies.

Wrap-up

  • Task describes an async computation that never fails.
  • TaskEither describes an async computation that may fail.
  • TaskOption describes an async computation that never fails and returns an Option.
  • TaskThese describes an async computation that may fail, succeed, or do both.
  • All tasks are thunks that return a Promise.
  • Tasks are more complex to work with but superior to promises in every other way:
    • We retain better control over the execution of the underlying Promise of a task.
    • Failures and edge cases are visible from the type signature.
    • The type system forces us to handle the failure cases.
  • Once we have a task, it stays a task until eliminated (by calling it and awaiting its result).
  • It is easy to convert between instances of different types of task monads.
  • To create a task that performs other tasks concurrently, we can use the sequenceArray monadic function.

The next post of this series delves into the Reader monads and how they can help us with dependency injection.

Extra resources

Top comments (0)