Supporting real-time updates in your mobile app

Introduction to real-time updates

After developing some queries and mutations, you might want to implement real-time updates. These are supported in the GraphQL specification by an operation type called Subscription. To support subscriptions in a production environment, Data Sync implements subscriptions using an MQTT PubSub subscription mechanism, however you might want to use the Apollo PubSub module to develop proof-of-concept applications.

When coding for real-time updates, you use the following modules:

  • @aerogear/voyager-server - supports clients that use voyager-client to enable GraphQL queries and mutations

  • @aerogear/voyager-subscriptions - supports clients that use voyager-client to enable GraphQL subscriptions

  • @aerogear/graphql-mqtt-subscriptions - supports GraphQL resolvers connections to a MQTT broker

GraphQL Subscriptions enable clients to subscribe to server events over a websocket connection.

The flow can be summarized as follows:

  • Client connects to the server using websockets, and subscribes to certain events.

  • As events occur, the server notifies the clients that are subscribed to those events.

  • Any currently connected client that is subscribed to a given event receives it.

  • The client can close the connection at any time and no longer receives updates.

To receive updates, the client must be currently connected to the server. The client does not receive events from subscriptions while offline. To support inactive clients, use Push Notifications.

Additional resources

Implementing real-time updates on a Data Sync server

The follow code shows typical code for a Data Sync Server without subscriptions:

const apolloServer = VoyagerServer({
  typeDefs,
  resolvers
})

const app = express()
apolloServer.applyMiddleware({ app })

app.listen({ port }, () =>
  console.log(`🚀 Server ready at http://localhost:${port}${apolloServer.graphqlPath}`)
)

The following sections outline the steps required to enable real-time updates:

  1. Implement a SubscriptionServer

  2. Implement a Publish Subscribe Mechanism

  3. Define subscriptions in the schema

  4. Implement resolvers

Implementing a SubscriptionServer using voyager-subscription

To allow you create GraphQL subscription types in your schema:

  1. Install the @aerogear/voyager-subscriptions package:

    $ npm i @aerogear/voyager-subscriptions
  2. Configure SubscriptionServer using @aerogear/voyager-subscriptions

    const { createSubscriptionServer } = require('@aerogear/voyager-subscriptions')
    
    const apolloServer = VoyagerServer({
      typeDefs,
      resolvers
    })
    
    const app = express()
    apolloServer.applyMiddleware({ app })
    const port = 4000
    
    const server = app.listen({ port }, () => {
      console.log(`🚀 Server ready at http://localhost:${port}${apolloServer.graphqlPath}`)
    
      createSubscriptionServer({ schema: apolloServer.schema }, {
        server,
        path: '/graphql'
      })
    })

    The createSubscriptionServer code:

    • returns a SubscriptionServer instance

    • installs handlers for

      • managing websocket connections

      • delivering subscriptions on the server

    • provides integrations with other modules such as @aerogear/voyager-keycloak.

Additional Information

For more information about arguments and options, see the subscriptions-transport-ws module.

Implementing a Publish Subscribe Mechanism

This procedure describes an in-memory implementation which is useful for prototyping but not suitable for production. AeroGear recommends using MQTT PubSub in production. See Configuring a Publish Subscribe mechanism for more information about all the implementation methods.

To provide a channel to push updates to the client using the default PubSub provided by apollo-server, you implement a Publish Subscribe mechanism, for example:

const { PubSub } = require('apollo-server')

const pubsub = new PubSub()
Addtional Information

Subscriptions depend on a publish subscribe mechanism to generate the events that notify a subscription. There are several PubSub implementations available based on the PubSubEngine interface.

Defining subscriptions in the schema

Subscriptions are a root level type. They are defined in the schema similar to Query and Mutation. For example, in the following schema, a Task type is defined and so are mutations and subscriptions.

type Subscription {
  taskCreated: Task
}

type Mutation {
  createTask(title: String!, description: String!): Task
}

type Task {
  id: ID!
  title: String!
  description: String!
}

Implementing resolvers

Inside the resolver map, subscription resolvers return an AsyncIterator, which listens for events. To generate an event, call the publish method. The pubsub.publish code is typically located inside a mutation resolver.

In the following example, when a new task is created, the createTask resolver publishes the result of this mutation to the TaskCreated channel.

const TASK_CREATED = 'TaskCreated'

const resolvers = {
  Subscription: {
    taskCreated: {
      subscribe: () => pubSub.asyncIterator(TASK_CREATED)
    }
  },
  Mutation: {
    createTask: async (obj, args, context, info) => {
      const task = tasks.create(args)
      pubSub.publish(TASK_CREATED, { taskCreated: task })
      return task
    }
  },
}
This subscription server does not implement authentication or authorization. For information about implementing authenication and authorization, see Supporting authentication and authorization in your mobile app.
Additional Information

For information on how to use subscriptions in your client code, see Realtime Updates.

Configuring a Publish Subscribe mechanism

You can use the Apollo PubSub mechanism for development, but you must use the MQTT PubSub mechanism for production.

Using the Apollo PubSub mechanism

The Implementing real-time updates on a Data Sync server section describes how to set up the default PubSub provided by apollo-server. For a production system, you require MQTT PubSub.

Using the MQTT PubSub mechanism

The @aerogear/graphql-mqtt-subscriptions module provides an AsyncIterator interface used for implementing subscription resolvers It connects the Data Sync server to an MQTT broker to support horizontally scalable subscriptions.

Initialize an MQTT client and pass that client to the @aerogeaar/graphql-mqtt-subscriptions module, for example:

const mqtt = require('mqtt')
const { MQTTPubSub } = require('@aerogear/graphql-mqtt-subscriptions')

const client = mqtt.connect('mqtt://test.mosquitto.org', {
  reconnectPeriod: 1000,
})

const pubsub = new MQTTPubSub({
  client
})

In the example, an mqtt client is created using mqtt.connect and then this client is passed into an MQTTPubSub instance. The pubsub instance can then be used to publish and subscribe to events in the server.

Configuring AMQ Online for MQTT Messaging

Red Hat AMQ supports the MQTT protocol which makes it a suitable PubSub technology for powering GraphQL subscriptions at scale.

This section provides recommendations for

  • Configuring AMQ Online for MQTT messaging.

  • Connecting to AMQ Online and using it as a pubsub within server applications.

Terminology
  • AMQ Online is a mechanism that allows developers to consume the features of Red Hat AMQ within OpenShift.

  • Red Hat AMQ provides fast, lightweight, and secure messaging for Internet-scale applications. AMQ Broker supports multiple protocols and fast message persistence.

  • MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol.

Prerequisites
  • OpenShift Cluster

  • AMQ Online is installed in the OpenShift cluster

AMQ Online includes many configuration options that could address the specific needs of your application. The minimum configuration steps for using AMQ Online for MQTT messaging and enabling GraphQL subscriptions are:

  1. Create an AddressSpace

  2. Create an Address

  3. Create a MessagingUser

Creating an address space

A user can request messaging resources by creating an AddressSpace. There are two types of address space, standard and brokered. You must use the brokered address space for MQTT based applications.

Procedure
  1. Create an address space, for example, the following resource creates a brokered AddressSpace:

    apiVersion: enmasse.io/v1beta1
    kind: AddressSpace
    metadata:
      name: myaddressspace
    spec:
      type: brokered
      plan: brokered-single-broker
  2. Create the AddressSpace.

    oc create -f brokered-address-space.yaml
  3. Check the status of the address space:

    oc get <`AddressSpace` name> -o yaml

    The output displays information about the address space, including details required for connecting applications.

Additional Information

See Creating address spaces using the command line for more information.

Creating an Address

An adress is part of an AddressSpace and represents a destination for sending and receiving messages. Use an Address with type topic to represent an MQTT topic.

  1. Create an address definition:

    apiVersion: enmasse.io/v1beta1
    kind: Address
    metadata:
        name: myaddressspace.myaddress # must have the format <`AddressSpace` name>.<address name>
    spec:
        address: myaddress
        type: topic
        plan: brokered-topic
  2. Create the address:

    oc create -f topic-address.yaml
See the Configuring your server for real-time updates guide for more information about using pubsub.asyncIterator(). Create an Address for each topic name passed into pubsub.asyncIterator().
Additional Information

See Creating addresses using the command line for more information.

Creating an AMQ Online user

A messaging client connects using an AMQ Online user, also known as a`MessagingUser`. A MessagingUser specifies an authorization policy that controls which addresses can be used and the operations that can be performed on those addresses.

Users are configured as MessagingUser resources. Users can be created, deleted, read, updated, and listed.

  1. Create a user definition:

    apiVersion: user.enmasse.io/v1beta1
    kind: MessagingUser
    metadata:
      name: myaddressspace.mymessaginguser # must be in the format <`AddressSpace` name>.<username>
    spec:
      username: mymessaginguser
      authentication:
        type: password
        password: cGFzc3dvcmQ= # must be Base64 encoded. Password is 'password'
      authorization:
        - addresses: ["*"]
          operations: ["send", "recv"]
  2. Create the MessagingUser.

    oc create -f my-messaging-user.yaml

An application can now connect to an AMQ Online address using this user’s credentials.

For more information see the AMQ Online User Model.

Using GraphQL MQTT PubSub with AMQ Online

Prerequisites
  • OpenShift cluster

  • AMQ Online is installed in the OpenShift cluster

The following AMQ Online resources are available for MQTT Applications

  • AddressSpace

  • Address

  • MessagingUser

This section describes how to use @aerogear/graphql-mqtt-subscriptions to connect to an AMQ Online Address.

  1. Retrieve the connection details for the AddressSpace you want to use:

    oc get addressspace <addressspace> -o yaml
  2. Determine which method you want to use to connect to the address:

    • Using the service hostname - Allows clients to connect from within the OpenShift cluster.

      AeroGear recommends that applications running inside OpenShift connect using the service hostname. The service hostname is only accessible within the OpenShift cluster. This ensures messages routed between your application and AMQ Online stay within the OpenShift cluster and never go onto the public internet.

    • Using the external hostname - Allows clients to connect from outside the OpenShift cluster.

      The external hostname allows connections from outside the OpenShift cluster. This is useful for the following cases:

      • Production applications running outside of OpenShift connecting and publishing messages.

      • Quick Prototyping and local development. Create a non-production AddressSpace, allowing developers to connect applications from their local environments.

  3. To connect to an AMQ Online Address using the service hostname

    1. Retrieve the service hostname:

      oc get addressspace <addressspace name> -o jsonpath='{.status.endpointStatuses[?(@.name=="messaging")].serviceHost
    2. Add code to create the connection, for example:

      const mqtt = require('mqtt')
      const { MQTTPubSub } = require('@aerogear/graphql-mqtt-subscriptions')
      
      const client = mqtt.connect({
        host: '<internal host name>',
        username: '<MessagingUser name>',
        password: '<MessagingUser password>',
        port: 5762,
      })
      
      const pubsub = new MQTTPubSub({ client })
    3. To encrypt all messages between your application and the AMQ Online broker, enable TLS, for example:

      const mqtt = require('mqtt')
      const { MQTTPubSub } = require('@aerogear/graphql-mqtt-subscriptions')
      
      const host = '<internal host name>'
      
      const client = mqtt.connect({
        host: host,
        servername: host,
        username: '<MessagingUser name>',
        password: '<MessagingUser password>',
        port: 5761,
        protocol: 'tls',
        rejectUnauthorized: false,
      })
      
      const pubsub = new MQTTPubSub({ client })
  4. To connect to an AMQ Online Address using the external hostname:

    The external hostname typically accept only accept TLS connections.
    1. Retrieve the external hostname:

      oc get addressspace <addressspace name> -o jsonpath='{.status.endpointStatuses[?(@.name=="messaging")].externalHost
    2. Connect to the external hostname, for example:

      const mqtt = require('mqtt')
      const { MQTTPubSub } = require('@aerogear/graphql-mqtt-subscriptions')
      
      const host = '<internal host name>'
      
      const client = mqtt.connect({
        host: host,
        servername: host,
        username: '<MessagingUser name>',
        password: '<MessagingUser password>',
        port: 443,
        protocol: 'tls',
        rejectUnauthorized: false,
      })
      
      const pubsub = new MQTTPubSub({ client })
  5. If you use TLS, note the following additional mqtt.connect options:

    • servername - when connecting to a message broker in OpenShift using TLS, this property must be set otherwise the connection will fail, because the messages are being routed through a proxy resulting in the client being presented with multiple certificates. By setting the servername, the client will use Server Name Indication (SNI) to request the correct certificate as part of the TLS connection setup.

    • protocol - must be set to 'tls'

    • rejectUnauthorizated - must be set to false, otherwise the connection will fail. This tells the client to ignore certificate errors. Again, this is needed because the client is presented with multiple certificates and one of the certificates is for a different hostname than the one being requested, which normally results in an error.

    • port - must be set to 5761 for service hostname or 443 for external hostname.

Using environment variables for configuration

AeroGear recommends that you use environment variables for connection, for example:

const mqtt = require('mqtt')
const { MQTTPubSub } = require('@aerogear/graphql-mqtt-subscriptions')

const host = process.env.MQTT_HOST || 'localhost'

const client = mqtt.connect({
  host: host,
  servername: host,
  username: process.env.MQTT_USERNAME,
  password: process.env.MQTT_PASSWORD,
  port: process.env.MQTT_PORT || 1883,
  protocol: process.env.MQTT_PROTOCOL || 'mqtt',
  rejectUnauthorized: false,
})

const pubsub = new MQTTPubSub({ client })

In this example, the connection options can be configured using environment variables, but sensible defaults for the host, port and protocol are provided for local development.

Troubleshooting MQTT Connection Issues

Troubleshooting MQTT Events

The mqtt module emits various events during runtime. It recommended to add listeners for these events for regular operation and for troubleshooting.

client.on('connect', () => {
  console.log('client has connected')
})

client.on('reconnect', () => {
  console.log('client has reconnected')
})

client.on('offline', () => {
  console.log('Client has gone offline')
})

client.on('error', (error) => {
  console.log(`an error has occurred ${error}`)
})

Read the mqtt documentation to learn about all of the events and what causes them.

Troubleshooting MQTT Configuration Issues

If your application is experiencing connection errors, the most important thing to check is the configuration being passed into mqtt.connect. Because your application may run locally or in OpenShift, it may connect using internal or external hostnames, and it may or may not use TLS, it’s very easy to accidentally provide the wrong configuration.

The Node.js mqtt module does not report any errors if parameters such as hostname or port are incorrect. Instead, it will silently fail and allow your application to start without messaging capabilities.

It may be necessary to handle this scenario in your application. The following workaround can be used.

const TIMEOUT = 10 // number of seconds to wait before checking if the client is connected

setTimeout(() => {
  if (!client.connected) {
    console.log(`client not connected after ${TIMEOUT} seconds`)
	// process.exit(1) if you wish
  }
}, TIMEOUT * 1000)

This code can be used to detect if the MQTT client hasn’t connected. This can be helpful for detecting potential configuration issues and allows your application to respond to that scenario.

Implementing real-time updates on on the client

A core concept of the GraphQL specification is an operation type called Subscription, they provide a mechanism for real time updates. For more information on GraphQL subscriptions see the Subscriptions documentation.

To do this GraphQL Subscriptions utilise websockets to enable clients to subscribe to published changes.

The architecture of websockets is as follows:

  • Client connects to websocket server.

  • Upon certain events, the server can publish the results of these events to the websocket.

  • Any currently connected client to that websocket receives these results.

  • The client can close the connection at any time and no longer receives updates.

Websockets are a perfect solution for delivering messages to currently active clients. To receive updates the client must be currently connected to the websocket server, updates made over this websocket while the client is offline are not consumed by the client. For this use case Push Notifications are recommended.

Voyager Client comes with subscription support out of the box including auto-reconnection upon device restart or network reconnect. To enable subscriptions on your client set the following paramater in the Voyager Client config object. A DataSyncConfig interface is also available from Voyager Client if you wish to use it.

Setting up a client to use subscriptions

To set up a client to use subscriptions:

  1. Provide a wsUrl string in the config object as follows:

    const config = {
        wsUrl: "ws://<your_websocket_url>"
    }

    where <your_websocket_url> is the full URL of the websocket endpoint of your GraphQL server.

  2. Use the object from step 1 to initialise Voyager Client:

    const { createClient } = require("@aerogear/voyager-client");
    
    const client = createClient(config)

Using Subscriptions

A standard flow to utilise subscriptions is as follows:

  1. Make a network query to get data from the server

  2. Watch the cache for changes to queries

  3. Subscribe to changes pushed from the server

  4. Unsubscibe when leaving the view where there is an active subscription

In the three examples below, subscribeToMore ensures that any further updates received from the server force the updateQuery function to be called with subscriptionData from the server.

Using subscribeToMore ensures the cache is easily updated as all GraphQL queries are automatically notified.

For more information, see the subscribeToMore documentation.

getTasks() {
  const tasks = client.watchQuery({
    query: GET_TASKS
  });

  tasks.subscribeToMore({
    document: TASK_ADDED_SUBSCRIPTION,
    updateQuery: (prev, { subscriptionData }) => {
    // Update logic here.
    }
  });
  return tasks;
}

To allow Voyager Client to automatically generate the updateQuery function for you, please see the Cache Update Helpers section.

You can then use this query in our application to subscribe to changes so that the front end is always updated when new data is returned from the server.

this.tasks = [];
this.getTasks().subscribe(result => {
  this.tasks = result.data && result.data.allTasks;
})

Note that it is also a good idea to unsubscribe from a query upon leaving a page. This prevents possible memory leaks. This can be done by calling unsubscribe() as shown in the following example. This code should be placed in the appropriate place.

this.getTasks().unsubscribe();

Handling network state changes

When using subscriptions to provide your client with realtime updates it is important to monitor network state because the client will be out of sync if the server if updated when the the client is offline.

To avoid this, Voyager Client provides a NetworkStatus interface which can be used along with the NetworkInfo interface to implement custom checks of network status.

For more information about how to import and configure a custom network status checker, see Advanced Topics.

Use the following example to re-run a query after a client returns to an online state:

const { CordovaNetworkStatus, NetworkInfo } = require("@aerogear/voyager-client");
const networkStatus = new CordovaNetworkStatus();

networkStatus.onStatusChangeListener({
  onStatusChange(networkInfo: NetworkInfo) {
    const online = networkInfo.online;
    if (online) {
      client.watchQuery({
        query: GET_TASKS
      });
    }
  }
});