DEV Community

Cover image for RealTime ReasonML Subscriptions on AWS with Wonka
Alain
Alain

Posted on • Updated on

RealTime ReasonML Subscriptions on AWS with Wonka

TLDR => the code.

So I saw this post on the AWS Mobile Blog on their new web-socket based real-time feature. I looked at the code and told myself:

Self, that is just few lines of code. Surely, you can bind that very easily. And I was right about the binding part so SIX WEEKS LATER, I am hear to share with you the result.

bothans-spies-died-image

So this was the simple code they provided us which can be found here.


import React, { Component } from "react";
import { useEffect, useReducer } from 'react'
import "./App.css";
import "bootstrap/dist/css/bootstrap.min.css";
import logo from "./logo.svg";
import Amplify from "@aws-amplify/core";
import "@aws-amplify/pubsub";
import API, { graphqlOperation } from "@aws-amplify/api";
import aws_exports from "../aws-exports";
Amplify.configure(aws_exports);

const createMessage = `mutation createMessage($message: String!){
    createMessage(input:{message:$message}) {
    __typename
    id
    message
    createdAt
    }
}
`;

const onCreateMessage = `subscription onCreateMessage {
    onCreateMessage {
    __typename
    message
    }
}`;

class App extends Component {
  constructor(props) {
    super(props);
    this.state = {
      message: "",
      value: "",
      display: false
    };
    this.handleChange = this.handleChange.bind(this);
    this.handleSubmit = this.handleSubmit.bind(this);
  }

  async componentDidMount() {
    this.subscription = API.graphql(
      graphqlOperation(onCreateMessage)
    ).subscribe({
      next: event => {
        if (event){
          console.log("Subscription: " + JSON.stringify(event.value.data, null, 2));
          console.log("EVENT: " + JSON.stringify(event, null, 2));
          this.setState({ display: true });
          this.setState({ message: event.value.data.onCreateMessage.message });
        }
      }
    });
  }

  handleChange(event) {
    console.log(event)
    this.setState({ value: event.target.value });
  }

  async handleSubmit(event) {
    event.preventDefault();
    event.stopPropagation();
    const message = {
      id: "",
      message: this.state.value,
      createdAt: ""
    };
    const mutation = await API.graphql(
      graphqlOperation(createMessage, message)
    );
    console.log("Mutation: " + JSON.stringify(mutation.data, null, 2));
  }

  render() {
    return (
      <div className="App">
        <img src={logo} className="App-logo" alt="logo" />
        <div className="jumbotron jumbotron-fluid p-0">
          <h2 className="center">Broadcaster</h2>
        </div>
        <br />
        <div className="container">
          <form onSubmit={this.handleSubmit}>
            <div className="form-group">
              <input
                className="form-control form-control-lg"
                type="text"
                value={this.state.value}
                onChange={this.handleChange}
              />
            </div>
            <input
              type="submit"
              value="Submit"
              className="btn btn-primary"
            />
          </form>
        </div>
        <br />
        {this.state.display ? (
          <div className="container">
            <div className="card bg-success">
              <h3 className="card-text text-white p-2">
                {this.state.message}
              </h3>
            </div>
          </div>
        ) : null}
      </div>
    );
  }
}

export default App;

Which I promptly converted to the hooks api because I don't know react classes:

/* https://github.com/idkjs/realtime-reason-aws/blob/master/src/js/App.js */
import API, { graphqlOperation } from '@aws-amplify/api';
import Amplify from '@aws-amplify/core';
import '@aws-amplify/pubsub';
import 'bootstrap/dist/css/bootstrap.min.css';
import { default as React, useEffect, useState } from 'react';
import '../App.css';
import aws_exports from '../aws-exports';
import logo from '../logo.svg';
Amplify.configure(aws_exports);

const createMessage = `mutation createMessage($message: String!){
    createMessage(input:{message:$message}) {
    __typename
    id
    message
    createdAt
    }
}
`;

const onCreateMessage = `subscription onCreateMessage {
    onCreateMessage {
    __typename
    message
    }
}`;

function App() {
    const [
        message,
        setMessage
    ] = useState('');
    const [
        value,
        setValue
    ] = useState('');
    const [
        display,
        setDisplay
    ] = useState(false);

    const handleSubmit = (event) => {
        event.preventDefault();
        event.stopPropagation();
        const message = {
            id: '',
            message: value,
            createdAt: ''
        };
        console.log('Message: ' + message);
        const mutation = API.graphql(graphqlOperation(createMessage, message));
        console.log('Mutation: ' + JSON.stringify(mutation.data, null, 2));
    };

    useEffect(() => {
        const subscription = API.graphql(graphqlOperation(onCreateMessage)).subscribe({
            next: (event) => {
                if (event) {
                    console.log('Subscription: ' + JSON.stringify(event.value.data, null, 2));
                    console.log('EVENT: ' + JSON.stringify(event, null, 2));
                    setDisplay(true);
                    let message = event.value.data.onCreateMessage.message;
                    setMessage(message);
                }
            }
        });

        return () => subscription.unsubscribe();
    }, []);

    const handleChange = (e) => {
        console.log(e.target.value);
        setValue(e.target.value);
    };
    return (
        <div className="App">
            <img src={logo} className="App-logo" alt="logo" />
            <div className="jumbotron jumbotron-fluid p-0">
                <h2 className="center">Broadcaster</h2>
            </div>
            <br />
            <div className="container">
                <form onSubmit={handleSubmit}>
                    <div className="form-group">
                        <input
                            className="form-control form-control-lg"
                            type="text"
                            value={value}
                            onChange={handleChange}
                        />
                    </div>
                    <input type="submit" value="Submit" className="btn btn-primary" />
                </form>
            </div>
            <br />
            {
                display ? <div className="container">
                    <div className="card bg-success">
                        <h3 className="card-text text-white p-2">{message}</h3>
                    </div>
                </div> :
                null}
                <br />
        </div>
    );
}
export default App;

Setting up the App

You can try the new implementation in few minutes with a small sample app. To get started install the Amplify CLI following the instructions here. Then you’ll need to install a couple of tools and dependencies using NPM to create a boilerplate React app:

$ npm install create-react-app
$ create-react-app myRealtimeReasonApp
$ cd myRealtimeReasonApp
$ npm install @aws-amplify/core @aws-amplify/api @aws-amplify/pubsub bootstrap reason-react wonka
$ npm install bs-platform @baransu/graphql_ppx_re --save-dev
$ touch bsconfig.json

Then add a this to bsconfig.json:

{
  "name": "react-template",
  "sources": {
    "dir": "src",
    "subdirs": true
  },
  "package-specs": [
    {
      "module": "commonjs",
      "in-source": true
    }
  ],
  "suffix": ".bs.js",
  "namespace": true,
  "bs-dependencies": [
    "reason-react",
    "wonka"
  ],
  "reason": {
    "react-jsx": 3
  },
  "ppx-flags": [
    "@baransu/graphql_ppx_re/ppx6"
  ],
  "refmt": 3
}

Execute the following command to create an Amplify project in the React app folder:

$ amplify init

Add an AppSync GraphQL API with API Key for the API Authentication:

$ amplify add api

Follow the default options. When prompted with “Do you have an annotated GraphQL schema?”, select “NO”, then when prompted with “Do you want a guided schema creation?”, select “YES” and “Single object with fields”. Replace the sample schema with:

type Message @model {
    id: ID!
    message: String!
    createdAt: String
}

The schema defines the app data model and uses the GraphQL Transform directive @model to deploy a DynamoDB table to store messages, automatically configuring all CRUDL (Create-Read-Update-Delete-List) logic in a GraphQL API to interact with data.

Execute the following command to create the AWS AppSync API and DynamoDB table:

$ amplify push

After running amplify push we are going to need a root directory graphql_schema.json file for the @baransu/graphql_ppx_re to work with. When you ran amplify push you generated src/graphql/schema.json. This is exactly what you need so now open your terminal and run:

$ cp src/graphql/schema.json graphql_schema.json

That should kill those unreadable ppx errors you might see.

Binding to @aws-amplify/api

First thing we will have to do is configure Amplify,Pubsuband API with our credentials created by aws in src/aws-exports.js.

We do this with this code:

type t;
[@bs.module "@aws-amplify/api"] external api: t = "default";
[@bs.send] external configureApi: (t, AwsExports.t) => unit = "configure";
// let configure = config => configure(api, config);
type pubsub;
[@bs.module "@aws-amplify/pubsub/lib-esm/index"] external pubsub: pubsub = "default";
[@bs.send]
external configurePubSub: (pubsub, AwsExports.t) => unit = "configure";
let configure = config => {
  configureApi(api, config);
  configurePubSub(pubsub, config);
};

Check out src/aws/AwsExports.re to see how to pull that code in and reference it.

Since I wrote a configure function that takes the config and configures both pubsub and api. If you don't do this, your pubsub queries will silently fail. It will happen silently because it on the js side and reason/bucklescript just trusts our bindings. It doesn't know what is going on over there.

The next think we want to do is turn our aws generated queries into reason queries. So with the @baransu/graphql_ppx_re package, we copy the queries we want to use and convert them so that:

export const createMessage = `mutation CreateMessage($input: CreateMessageInput!) {
  createMessage(input: $input) {
    id
    message
    createdAt
  }
}

become this in src/graphql/Graphql.re:

module CreateMessage = [%graphql
  {|
    mutation CreateMessage($input: CreateMessageInput!) {
     createMessage(input: $input) {
      id
      message
      createdAt
    }
  }
  |}
];

module OnCreateMessage = [%graphql
  {|
    subscription onCreateMessage {
      onCreateMessage {
      __typename
      message
      }
    }
|}
];

Now that we have mutation and subscription queries set for action let setup our mutation first.

The Mutation Binding

The first thing we need is a type for our graphqlOperation. amplify/api is expecting a graphqlOperation which is your standard query and variables graphql query.

This is what it looks like in amplify-js at https://github.com/aws-amplify/amplify-js/blob/867412030de57fd74078b609252de6f7f81ad331/packages/api/src/API.ts#L31-L34:

export const graphqlOperation = (query, variables = {}) => ({
    query,
    variables,
});

This is important! Just because the library exports it doesn't mean we have to bind directly to it. We can see what it looks like. We know its typed as an object be of the (query, variables = {}) parameter.

[@bs.send]
external _graphql:
  (t, Types.graphqlOperation) => Js.Promise.t(Types.executionResult) =
  "graphql";

We also know that our @baransu/graphql_ppx-re ppx converts the query to the following type which we can see by hovering over our mutationRequest:

mutationRequest screenshot

Note that query is a string and variables is of type Js.Json.t.

Js.t(
  < parse : Js.Json.t ->
          < createMessage : < createdAt : string option; id : string;
                              message : string >
                            Js.t option >
          Js.t;
  query : string; variables : Js.Json.t >
)
type t('a)

So even though the source doesn't specify the string and Js.Json.t typing we use the information available to us to type properly. We we try to somehow send it across the wire as something else, it would fail and then we could go through the debug process that would teach us to trust the reason/bucklescript compiler. That's how I learn, anyway. That and reading the library sources and plenty of public repositories online.

Building our mutation, then, looks like this.

First, we run the mutation through the ppx:

let mutationRequest = Graphql.CreateMessage.make(~input=message, ());

Then we pull off the generated query and variables values to construct our graphqlOperation value. Note that since I haven't used open Api; or open Type; to bring all of the values into scope I have to type graphqlOperation by annotating it with :Types.graphqlOperation to tell the compiler what I am building.

   let graphqlOperation: Types.graphqlOperation = {
      query: mutationRequest##query,
      variables: Some(mutationRequest##variables),
    };

Lastly, I pass the graphqlOperation to the API.mutate function which expects it and calls the API._graphql function.

This is API.mutate:

let mutate: Types.operation =
  graphqlOperation => {
    _graphql(api, graphqlOperation);
  };

The :Types.mutation is not strictly necessary here but it helps if want to do some other things where we have a function that needs to know what kind of function its getting.

And this is our binding to the API's graphql function that it uses to process queries.

[@bs.send]
external _graphql:
  (t, Types.graphqlOperation) => Js.Promise.t(Types.executionResult) =
  "graphql";

We call it with:

    API.mutate(graphqlOperation)
    |> Js.Promise.then_(response => {
         Js.log2("reason_broadcaster_mutation", response)
         |> Js.Promise.resolve
       });

The mutation and query function return promises so we are handling the promise here.

This is all happening in the handleSubmit function. I didn't want to keep typing something into the input box to test, so I wrote a function that generates a timestamp and posts it to the input box to act as our value.

Here is the whole handleSubmit function:

let handleSubmit = event => {
    /* create a message using a timestamp so we dont have to keep putting in a message to test */
    let time = Js.Date.now();
    let value = "RE: " ++ time->Js.Date.fromFloat->Js.Date.toLocaleString;
    setValue(_ => value);
    let _ = ReactEvent.Form.preventDefault(event);
    let _ = ReactEvent.Form.stopPropagation(event);
    let message = {"id": None, "message": value, "createdAt": None};
    let mutationRequest = Graphql.CreateMessage.make(~input=message, ());
    let graphqlOperation: Types.graphqlOperation = {
      query: mutationRequest##query,
      variables: Some(mutationRequest##variables),
    };
    API.mutate(graphqlOperation)
    |> Js.Promise.then_(response => {
         Js.log2("reason_broadcaster_mutation", response)
         |> Js.Promise.resolve
       });
  };

Handling Subscriptions

This is that part that took six weeks. I ended up learning all kinds of things while chasing this down including callbags, xstream, observables and so on. I even made some pull request that were accepted on reason-react-native and bs-faker. The chase was long! Incidentally, you have no idea what you will learn by going over a seasoned developers code line by line. I encourage you to find some repositories, change the bs-platform version to the just released bs-platform@7.0.1, see what breaks and fix it. Try to find a repo with tests though.

Back to subscriptions. When you pass a subscription to API it returns an Observable rather than a promise. The code checks to see what type of graphqlOperation it receives and returns one or the other, accordingly. See line 350 in the API.ts in the source.

     * @returns {Promise<GraphQLResult> | Observable<object>}

Observables are apparently not a simple thing. There are few examples in reason on github and the js examples are hard to look at. The most popular js package for handling an observable seems to be zen-observable. It implements a spec compliant observable, which, I guess, means something in programming. After trying a bunch of things, including updating bs-xstream, I ended up using wonka which binds to zen-observable's spec compliant observable. Why? Well amplify-js uses zen-observable as its observable.

import * as Observable from 'zen-observable';

source

Using Wonka.fromObservable

Wonka, per it's github repo, is:

A fast push & pull stream library for Reason, OCaml, and TypeScript

Per the author:

So suppose you have zen-observable's Observable, passing it into Wonka.fromObservable converts it into a Wonka stream. You can then subscribe to this stream using Wonka.subscribe which accepts a function that is called for each value in your stream and returns a subscription (which can be used to unsubscribe)

Phil Pluckman on issue#38

So if we know we are getting a zen-observable from amplify-js, we can pass it to Wonka which will convert it to a stream that we can subscribe to.

The API._graphql binding we wrote previously, returns a promise which is not what we want here. I started by writing another binding to the same function, (yes, this allowed), that returns a Wonka.observableT instead of a promise. The docs for using the Wonka.fromObservable are more or less unusable at the moment. I got it to work by reading the js version and applying the concepts to the reason version.

Rummaging through various zen-observable bindings on github I figured out that we needed to create an observerLike('a) type and pass it to an observableLike('a) type to create our observable. We then pass that to Wonka.fromObservable which turns it into a stream and then gives us the result when it gets it back from the js side.

In Types.re we add these types:


type onCreateMessage = {
  __typename: string,
  message: string,
};

type event = {value}
and value = {data: onCreateMessage};

type errorValue = {message: string};

type observerLike('event) = {
  next: event => unit,
  error: errorValue => unit,
  complete: unit => unit,
};

type observableLike('value) = {
  subscribe:
    observerLike('value) => {. [@bs.meth] "unsubscribe": unit => unit},
};

We create an event type that models what we expect to recieve as an event, which is our subscription event.

type onCreateMessage = {
  __typename: string,
  message: string,
};

type event = {value}
and value = {data: onCreateMessage};

type errorValue = {message: string};

We then use that in our observerLike('event) which pass to type observableLike('value). I stumbled upon this solution by following the type errors the compiler was giving me. Its truly amazing that the compiler errors can be used to guide you to the correct solution when you might be so utterly clueless about how the underlying code works, like I was in this case. I say that to encourage you to hack away, try to understand the errors, and then find a solution.

We use these types to create our new subscription binding. Here is what it looks like:

[@bs.send]
external _subscribe:
  (t, Types.graphqlOperation) =>
  Wonka.observableT(Types.observableLike(Types.observerLike('value))) =
  "graphql";

This uses bucklescripts external to call graphql on the javascript side. It expects to receive an object of type graphqlOperation and then return our observable represented by Wonka.observableT(Types.observableLike(Types.observerLike('value)).

Then we create a subscribe function that we curry so we don't have to pass the api instance at the call site(our useEffect function):

let subscribe = graphqlOperation => _subscribe(api, graphqlOperation);

The api is the type t required by the function. We pass it here so the function is curried. That is, it has one of the values it need to be called so now its just waiting on the last parameter it needs, the graphqlOperation parameter. When it gets it, it will call the js function we bound to. Let's do that next.

We create a useEffect function that will listen to our pubsub endpoint and report when it gets something.

In useEffect we create our subscription query that we will pass to API.subscribe:

    let subRequest = Graphql.OnCreateMessage.make();
    let graphqlOperation: Types.graphqlOperation = {
      query: subRequest##query,
      variables: Some(subRequest##variables),
    };

Then we pass graphqlOperation to API.subscribe to create our observable source:

let source = API.subscribe(graphqlOperation);

From the Wonka Docs

A “source” in Wonka is a provider of data. It provides data to a “sink” when the “sink” requests it. This is called a pull signal and for synchronous sources no time will pass between the sink pulling a new value and a source sending it. For asynchronous sources, the source may either ignore pull signals and just push values or send one some time after the pull signal.

We define our subscription by passing it the source and then calling Wonka.fromObservable on it to convert it to a stream/sink(?). Once we get our answer, we call the subscriptions unsubscribe method that we defined above. Here it is again:

type observableLike('value) = {
  subscribe:
    observerLike('value) => {. [@bs.meth] "unsubscribe": unit => unit},
};

...


let subscription =
      source
      |> Wonka.fromObservable
      |> Wonka.subscribe((. event) => {
            let message = extractMessageFrom(event);
           setMessage(_ => Some(message));
           Js.log2("subscription_event", message);
         });
Some(() => subscription.unsubscribe());

Lastly, when we get our value back, we want to extract the message and post it to the ui. I created this super hacky function to do it because I could not figure out how to work with the response. Not yet, anyway.

  let extractMessageFrom = event => {
    /* use Obj.magic to change time, otherwise code in Wonka.subscribe     breaks. */
    let event = event->Obj.magic;
    /* get the message value on event and post to ui */
    let message = event##value##data##onCreateMessage##message;
    message;
  };

All together now, I have this set up two different ways because I was just learning it. Here is he simple version, found in src/Demo2.re:

  React.useEffect0(() => {
    let subRequest = Graphql.OnCreateMessage.make();
    let graphqlOperation: Types.graphqlOperation = {
      query: subRequest##query,
      variables: Some(subRequest##variables),
    };
    /* The observer's type is:
       `Wonka.observableT(
          ReactTemplate.Types.observableLike(ReactTemplate.Types.observerLike('a))
          )`
           */
    let source = API.subscribe(graphqlOperation);
    let subscription =
      source
      |> Wonka.fromObservable
      |> Wonka.subscribe((. event) => {
            let message = extractMessageFrom(event);
           setMessage(_ => Some(message));
           Js.log2("subscription_event", message);
         });
    Some(() => subscription.unsubscribe());
  });

Get in your terminal and run yarn start. Open two browsers so you can see multiple instances of your web page responding to the same signal.

This is what it should look like:

ScreenRecording.gif

Latest comments (0)