DEV Community

loading...

AWS Amplify Subscriptions Usage / 4. Two-Mutations, One-Subscription Pattern

tacck profile image Kihara, Takuya ・6 min read

Until previous articles, we make "realtime responded" chat.
This mean, all users receive new message immediately when someone post it.

That is very useful.
But, sometimes we want to control timings of publishing messages.

For example, 10 people use same chat room, everyone post message at same time.
The server need to publish 100 messages. (from 10 people * to 10 people)
And each user receive 10 messages at same time.

100 people post messages, the server need to publish 10,000 messages.
1,000 people, 1,000,000 messages...

Of course, each user need to receive many messages at same time, the browser has to work so hard.
It is bad effect for end-users UX.

So, we need to control timings of publishing messages.
This way is a little complex but helps your development.

The point is "Separate a Mutation for Publishing".

Sequence Diagram / Separate a Mutation for Publishing

Add function

Add function for this project.

$ amplify add function
Scanning for plugins...
Plugin scan successful
? Select which capability you want to add: Lambda function (serverless function)
? Provide an AWS Lambda function name: sampleamplifysubscriXXXXXXXX
? Choose the runtime that you want to use: NodeJS
? Choose the function template that you want to use: Hello World

Available advanced settings:
- Resource access permissions
- Scheduled recurring invocation
- Lambda layers configuration

? Do you want to configure advanced settings? No
? Do you want to edit the local lambda function now? No
Successfully added resource sampleamplifysubscriXXXXXXXX locally.

Next steps:
Check out sample function code generated in <project-dir>/amplify/backend/function/sampleamplifysubscriXXXXXXXX/src
"amplify function build" builds all of your functions currently in the project
"amplify mock function <functionName>" runs your function locally
"amplify push" builds all of your local backend resources and provisions them in the cloud
"amplify publish" builds all of your local backend and front-end resources (if you added hosting category) and provisions them in the cloud

$
Enter fullscreen mode Exit fullscreen mode

And, push project.

$ amplify push
✔ Successfully pulled backend environment dev from the cloud.

Current Environment: dev

| Category | Resource name                | Operation | Provider plugin   |
| -------- | ---------------------------- | --------- | ----------------- |
| Function | sampleamplifysubscriXXXXXXXX | Create    | awscloudformation |
| Api      | sampleamplifysubscri         | No Change | awscloudformation |
| Auth     | sampleamplifysubscriXXXXXXXX | No Change | awscloudformation |
? Are you sure you want to continue? Yes
⠦ Updating resources in the cloud. This may take a few minutes...

(snip)

✔ All resources are updated in the cloud


$
Enter fullscreen mode Exit fullscreen mode

Edit GraphQL schema

Edit and update GraphQL schema.

amplify/backend/api/sampleamplifysubscri/schema.graphql

type ResponseLateRoomChat
  @model(subscriptions: null)
  @auth(
    rules: [
      { allow: owner, provider: userPools }
      { allow: public, provider: apiKey, operations: [read] }
    ]
  ) {
  id: ID!
  roomName: String!
  message: String!
  createdAt: AWSDateTime
}

type Mutation {
  postResponses(input: PostResponsesInput): PostResponsesOutput
    @function(name: "sampleamplifysubscriXXXXXXXX-${env}")
}

input PostResponsesInput {
  roomName: String!
  items: [PostResponsesInputItem]
}

input PostResponsesInputItem {
  id: ID!
  roomName: String!
  message: String!
  owner: String
  createdAt: AWSDateTime
  updatedAt: AWSDateTime
}

type PostResponsesOutput @aws_api_key @aws_cognito_user_pools {
  roomName: String!
  items: [PostResponsesOutputItem]
}

type PostResponsesOutputItem @aws_api_key @aws_cognito_user_pools {
  id: ID!
  roomName: String!
  message: String!
}

type Subscription {
  onCreateRoomChatByRoomName(roomName: String!): RoomChat
    @aws_subscribe(mutations: ["createRoomChat"])
  onCreateCloseRoomChatByRoomName(roomName: String!): CloseRoomChat
    @aws_subscribe(mutations: ["createCloseRoomChat"])
    @aws_api_key
  onPostResponses(roomName: String!): PostResponsesOutput
    @aws_subscribe(mutations: ["postResponses"])
    @aws_api_key
}
Enter fullscreen mode Exit fullscreen mode

Explain step by step.

Add model

Add the model "ResponseLateRoomChat".

type ResponseLateRoomChat
  @model(subscriptions: null)
  @auth(
    rules: [
      { allow: owner, provider: userPools }
      { allow: public, provider: apiKey, operations: [read] }
    ]
  ) {
  id: ID!
  roomName: String!
  message: String!
  createdAt: AWSDateTime
}
Enter fullscreen mode Exit fullscreen mode

This looks like "CloseRoomChat" that added by previous article.
But we have new property "createdAt".
This is required by "listResponseLateRoomChats".

Add mutation with function directive

Add Mutation type with function directive.

type Mutation {
  postResponses(input: PostResponsesInput): PostResponsesOutput
    @function(name: "sampleamplifysubscriXXXXXXXX-${env}")
}

input PostResponsesInput {
  roomName: String!
  items: [PostResponsesInputItem]
}

input PostResponsesInputItem {
  id: ID!
  roomName: String!
  message: String!
  owner: String
  createdAt: AWSDateTime
  updatedAt: AWSDateTime
}

type PostResponsesOutput @aws_api_key @aws_cognito_user_pools {
  roomName: String!
  items: [PostResponsesOutputItem]
}

type PostResponsesOutputItem @aws_api_key @aws_cognito_user_pools {
  id: ID!
  roomName: String!
  message: String!
}
Enter fullscreen mode Exit fullscreen mode

When we use "postResponses" mutation, Amplify call Lambda function that added previous.

"PostResponsesOutput" has "roomName" property because of "onPostResponses" argument required.
https://docs.amplify.aws/cli/graphql-transformer/examples#filter-subscriptions-by-model-fields-andor-relations

Update subscription

Add "onPostResponses".

type Subscription {
  onCreateRoomChatByRoomName(roomName: String!): RoomChat
    @aws_subscribe(mutations: ["createRoomChat"])
  onCreateCloseRoomChatByRoomName(roomName: String!): CloseRoomChat
    @aws_subscribe(mutations: ["createCloseRoomChat"])
    @aws_api_key
  onPostResponses(roomName: String!): PostResponsesOutput
    @aws_subscribe(mutations: ["postResponses"])
    @aws_api_key
}
Enter fullscreen mode Exit fullscreen mode

Add VTL (optional)

If you want to permit to authorized user for "postResponses", this section helps you.

Write the following code.

amplify/backend/api/sampleamplifysubscri/resolvers/Mutation.postResponses.req.vtl

## [Start] Check authMode and execute owner/group checks **
#if( $authMode == "userPools" )
  ## No Static Group Authorization Rules **


  ## No Dynamic Group Authorization Rules **


  ## [Start] Owner Authorization Checks **
  #set( $isOwnerAuthorized = false )
  ## Authorization rule: { allow: owner, ownerField: "owner", identityClaim: "cognito:username" } **
  #set( $allowedOwners0 = $util.defaultIfNull($ctx.args.input.owner, null) )
  #set( $identityValue = $util.defaultIfNull($ctx.identity.claims.get("username"), $util.defaultIfNull($ctx.identity.claims.get("cognito:username"), "___xamznone____")) )
  #if( $util.isList($allowedOwners0) )
    #foreach( $allowedOwner in $allowedOwners0 )
      #if( $allowedOwner == $identityValue )
        #set( $isOwnerAuthorized = true )
      #end
    #end
  #end
  #if( $util.isString($allowedOwners0) )
    #if( $allowedOwners0 == $identityValue )
      #set( $isOwnerAuthorized = true )
    #end
  #end
  #if( $util.isNull($allowedOwners0) && (! $ctx.args.input.containsKey("owner")) )
    $util.qr($ctx.args.input.put("owner", $identityValue))
    #set( $isOwnerAuthorized = true )
  #end
  ## [End] Owner Authorization Checks **


  ## [Start] Throw if unauthorized **
  #if( !($isStaticGroupAuthorized == true || $isDynamicGroupAuthorized == true || $isOwnerAuthorized == true) )
    $util.unauthorized()
  #end
  ## [End] Throw if unauthorized **
#end
## [End] Check authMode and execute owner/group checks **

## [Start] Stash resolver specific context.. **
$util.qr($ctx.stash.put("typeName", "Mutation"))
$util.qr($ctx.stash.put("fieldName", "postResponses"))
{}
## [End] Stash resolver specific context.. **
Enter fullscreen mode Exit fullscreen mode

This VTL refers to Auto-Generated VTL.
(Under amplify/backend/api/sampleamplifysubscri/build/resolvers)

Push function

Write the following code.

amplify/backend/function/sampleamplifysubscriXXXXXXXX/src/index.js

exports.handler = async event => {
  console.log(event)
  return event.arguments.input
}
Enter fullscreen mode Exit fullscreen mode

And, push project.

$ amplify push
✔ Successfully pulled backend environment dev from the cloud.

Current Environment: dev

| Category | Resource name                | Operation | Provider plugin   |
| -------- | ---------------------------- | --------- | ----------------- |
| Function | sampleamplifysubscriXXXXXXXX | Update    | awscloudformation |
| Api      | sampleamplifysubscri         | No Change | awscloudformation |
| Auth     | sampleamplifysubscriXXXXXXXX | No Change | awscloudformation |
? Are you sure you want to continue? Yes
⠦ Updating resources in the cloud. This may take a few minutes...

(snip)

✔ All resources are updated in the cloud


$
Enter fullscreen mode Exit fullscreen mode

UI Implementation

Back-end works are finished.

Next, we make Front-end UI.
There are two roles, and two pages.

  • Roles:
    • Admin
    • Client

Admin

"Admin" role controls publishing messages.

See detail: src/views/ResponseLateRoomChatAdmin.vue

<template>
...
</template>

<script>
import { API, graphqlOperation } from 'aws-amplify'
import { GRAPHQL_AUTH_MODE } from '@aws-amplify/api-graphql'
import { createResponseLateRoomChat, postResponses } from '@/graphql/mutations'
import { listResponseLateRoomChats } from '@/graphql/queries'
import { onPostResponses } from '@/graphql/subscriptions'

import ChatList from '@/components/ChatList'

export default {
  components: { ChatList },
  data: function() {
    return {
...
      intervalTime: 10000,
      interval: null,
    }
  },
  created: async function() {
    this.setSubscribeByRoomName('room1')
  },
  mounted: async function() {
    this.interval = setInterval(async () => {
      const getStartTime = new Date(Date.now() - this.intervalTime)
      const postedMessages = await API.graphql({
        query: listResponseLateRoomChats,
        variables: {
          filter: { createdAt: { ge: getStartTime.toISOString() } },
        },
        authMode: GRAPHQL_AUTH_MODE.API_KEY,
      })
      console.log('interval listResponseLateRoomChats', postedMessages)
      if (postedMessages.data.listResponseLateRoomChats.items.length > 0) {
        this.rooms.forEach(roomName => {
          const messages = postedMessages.data.listResponseLateRoomChats.items.filter(
            item => item.roomName === roomName,
          )
          if (messages && messages.length > 0) {
            this.postResponsesToAll(roomName, messages)
          }
        })
      }
    }, this.intervalTime)
  },
  beforeDestroy: function() {
    this.clearSubscriptions()
    if (this.interval) {
      clearInterval(this.interval)
      this.interval = null
    }
  },
  methods: {
...
    postResponsesToAll: async function(roomName, messages) {
      const message = await API.graphql(
        graphqlOperation(postResponses, {
          input: {
            roomName: roomName,
            items: messages,
          },
        }),
      )
      console.log('postResponsesToAll', message)
    },
    setSubscribeByRoomName(roomName) {
      this.clearSubscriptions()

      this.onPostResponses[roomName] = API.graphql({
        query: onPostResponses,
        variables: {
          roomName: roomName,
        },
        authMode: GRAPHQL_AUTH_MODE.API_KEY,
      }).subscribe({
        next: ({ provider, value }) => {
          console.log('onPostResponses', { provider, value })
          value.data.onPostResponses.items.map(item => {
            this.subscriptionMessages[item.roomName].push(item)
          })
        },
      })
    },
...
  },
}
</script>

<style></style>
Enter fullscreen mode Exit fullscreen mode

Client

"Client" role can receive messages published by Admin.

Almost same as Admin's code without postResponses.

See detail: src/views/ResponseLateRoomChatAdmin.vue

<template>
...
</template>

<script>
import { Auth, API, graphqlOperation } from 'aws-amplify'
import { GRAPHQL_AUTH_MODE } from '@aws-amplify/api-graphql'
import { createResponseLateRoomChat } from '@/graphql/mutations'
import { onPostResponses } from '@/graphql/subscriptions'

import ChatList from '@/components/ChatList'

export default {
  components: { ChatList },
  data: function() {
    return {
...
    }
  },
...
  methods: {
    sendMessage: async function() {
      const message = await API.graphql(
        graphqlOperation(createResponseLateRoomChat, {
          input: { message: this.inputMessage, roomName: this.roomName },
        }),
      )
      console.log('sendMessage', message)

      this.messages[this.roomName].push(message.data.createResponseLateRoomChat)
      this.inputMessage = ''
    },
    setSubscribeByRoomName(roomName) {
      this.clearSubscriptions()

      this.onPostResponses[roomName] = API.graphql({
        query: onPostResponses,
        variables: {
          roomName: roomName,
        },
        authMode: GRAPHQL_AUTH_MODE.API_KEY,
      }).subscribe({
        next: ({ provider, value }) => {
          console.log('onPostResponses', { provider, value })
          value.data.onPostResponses.items.map(item => {
            this.subscriptionMessages[item.roomName].push(item)
          })
        },
      })
    },
...
  },
}
</script>

<style></style>
Enter fullscreen mode Exit fullscreen mode

See other files:
https://github.com/tacck/sample-amplify-subscriptions/tree/4-two-mutations-one-subscription

Check

Let's check this application.

You open a browser and open "Response Late Room Chat Admin" (use authorized user).
Then, you open another browser, sign-in another user, open "Response Late Room Chat Client".

When you post new message, you receive it each browsers few seconds later.
If you post multi messages, you receive messages at once!

We have many steps, but the thinking is simple.
One mutation (without subscription) only works post the messages to the table.
Another mutation only works to publish the messages.

AWS Amplify also gives you this kind of control.

Discussion (0)

pic
Editor guide