🎉 New release for Pusher Chatkit - Webhooks! Extend your in-app chat functionality
Hide
Products
chatkit_full-logo

Extensible API for in-app chat

channels_full-logo

Build scalable realtime features

beams_full-logo

Programmatic push notifications

Developers

Docs

Read the docs to learn how to use our products

Tutorials

Explore our tutorials to build apps with Pusher products

Support

Reach out to our support team for help and advice

Sign in
Sign up

Service discovery in a microservice architecture with Pusher Channels

  • Lanre Adelowo
December 11th, 2018
You will need Node, Golang 1.10+ and Docker set up on your machine.

In this tutorial, we will be implementing a service discovery using Pusher Channels. To demonstrate this, we will be building two services - both of them a unique ID generator. They will be implemented in both NodeJS and Golang. This is done so as to depict a real-life microservices scenario where services are built in multiple languages. In our use-case here, the NodeJS application will be version 1 while the second iteration will be in Golang. As said earlier, both of them are unique ID generators and basically all they do is generate a UUID. The Node service generates a UUID V5 while the Golang service will generate a UUID v4.

Microservices are an interesting pattern but they usually present a new class of problems. An interesting problem this pattern presents is communication between the bulk of services that make up the entire mesh. These services are run in multiple replicas - depending on scalability needs - and dynamic environments. Take Kubernetes as an example, deploying a container might run in server A, stopping and running it again might deploy it to server. For both of them, you’d obviously get an IP address but then again, you cannot guarantee it won’t change. At this rate, it is clear maintaining a hardcoded list of all service targets wouldn’t make any sense.

Even if we assume Kubernetes is not used and the application is being deployed pre Docker style - a single host, multiple instances. It is still hard to keep an updated list of all running instances. Just try to think of having to keep track of all ports you have assigned to an instance and the pain of having to update them manually.

To solve the communication problem within microservices, a pattern called service discovery emerged. With service discovery, each service when started would inform a central registry of its reachable IP address (8.8.8.8) and port (53). Also whenever it is being shut down, maybe as a result of CTRL+C, it deregisters itself from the registry. Popular implementations of service discovery include Consul and etcd.

We will also build another service - the API gateway - which will listen to connections from our unique ID generator services. This API gateway will be the public facing program as it will proxy request to an available instance of services registered with Pusher Channels.

Prerequisites

  • Node
  • Golang >= 1.10.
  • Docker. We will be using this to run multiple copies of the services.
  • A Pusher Channels application. Create one here.

Pusher Channels registry

Standards rule literally everything and we need to define the data structure our registry requires. Below is an example of what needs to be sent to the channel in order to successfully process its inclusion as a service backend.

    {
      "prefix": "/v1", // App prefix to use for routing to this service
      "port": 3000, // The port this service is running on
      "address": "172.17.0.4", // The IP address where this service is reachable at.
    }

Directory structure

Here is the directory structure you will need to create to follow this tutorial:

── $GOPATH/src/github.com/pusher-tutorial/pusher-channel-discovery ├── golang ├── nodejs ├── nodejs-api-gateway

Building the NodeJS service

This backend will make use of the Channels server SDK for Node. We will use it to send a register event to the central registry, in our case a channel.

To get started, create a folder called nodejs. This should match the directory structure above. In that project directory, you need to create another file called package.json. The package.json file should have the following contents:

    // pusher-channel-discovery/nodejs/package.json
    {
      "name": "nodejs-channel-discovery",
      "version": "1.0.0",
      "description": "Using pusher channel as a service registry",
      "main": "index.js",
      "dependencies": {
        "body-parser": "^1.18.3",
        "express": "^4.16.4",
        "internal-ip": "^3.0.1",
        "pusher": "^2.1.3",
        "uuid": "^3.3.2"
      },
      "devDependencies": {
        "nodemon": "^1.18.6"
      }
    }

Switch to your terminal app and navigate to the directory you created above - cd path/to/nodejs-channel-discovery. You then need to run the following command to install the dependencies needed to build this service.

    $ npm install

Next, create an index.js file in the root of the folder you created earlier and paste the contents below into it.

    // pusher-channel-discovery/nodejs/index.js

    const express = require('express');
    const bodyParser = require('body-parser');
    const os = require('os');
    const uuidv5 = require('uuid/v5');
    const uuidv4 = require('uuid/v4');
    const Pusher = require('pusher');
    const internalIp = require('internal-ip');

    const app = express();
    const hostName = os.hostname();
    const port = process.env.PORT || 3000;

    let pusher = new Pusher({
      appId: process.env.PUSHER_APP_ID,
      key: process.env.PUSHER_APP_KEY,
      secret: process.env.PUSHER_APP_SECRET,
      encrypted: process.env.PUSHER_APP_SECURE,
      cluster: process.env.PUSHER_APP_CLUSTER,
    });

    let svc = {};

    internalIp
      .v4()
      .then(ip => {
        svc = {
          prefix: '/v1',
          port: port,
          address: ip,
        };

        console.log('Registering service');

        pusher.trigger('mapped-discovery', 'register', svc);
      })
      .catch(err => {
        console.log(err);
        process.exit();
      });

    process.stdin.resume();

    process.on('SIGINT', () => {

      console.log('Deregistering service... ');

      // Send an exit signal on shutdown
      pusher.trigger('mapped-discovery', 'exit', svc);

      // Timeout to make sure the signal sent to
      // Pusher was successful before shutting down
      setTimeout(() => {
        process.exit();
      }, 1000);
    });

    app.use(bodyParser.json());

    app.use(function(req, res, next) {
      // Uniquely identify the server that processed the request
      res.header('X-Server', hostName);
      next();
    });

    app.get('/', function(req, res) {
      res.status(200).send({ service: 'ID generator' });  
    });

    app.get('/health', function(req, res) {
      res.status(200).send({ status: 'ok' });
    });

    app.post('/generate', function(req, res) {

      const identifier = req.body.id;
      if (identifier === undefined) {
        res.status(400).send({
          message: 'Please provide an ID to use to generate your UUID V5',
        });
        return;
      }

      if (identifier.length === 0) {
        res.status(400).send({
          message: 'Please provide an ID to use to generate your UUID V5',
        });
        return;
      }

      res.status(200).send({
        id: uuidv5(identifier, uuidv5.URL),
        timestamp: new Date().getTime(),
        message: 'UUID was successfully generated',
      });
    });

    app.listen(port, function() {
      console.log(`Service is running at ${port} at ${hostName}`);
    });

In the above code, three endpoints were created:

  • / - This is the root handler of the application. This endpoint returns basic information of the service.
  • /health - This endpoint allows the application to notify others about its internal state.
  • /generate - This endpoint is meant for the generation of a UUID. It generates a version 5 UUID. This endpoint expects an ID from the caller which it then uses to compute the UUID.

Since we will be needing to run multiple copies of this service, let's set it up to run as a container. To do that, we create a Dockerfile. Create the file and paste the content below into the file.

    # pusher-channel-discovery/nodejs/Dockerfile
    FROM node:10
    COPY . ./
    RUN npm install
    CMD ["node", "."]

You need to save the file and build the image for this service. To do that, you should run the following command. Please note that this has to be done in the root of the nodejs directory.

    $ docker build -t pusher-channel-node .

You can then run the service. But to do that, you need to use the credentials from the Pusher Channels application you created at the start of the tutorial.

    $ docker run -p 127.0.0.1:3000:3000 -e PUSHER_APP_ID="XXXX" -e PUSHER_APP_KEY="XXXXXXX" -e PUSHER_APP_SECRET="XXXXXX" -e PUSHER_APP_CLUSTER="eu" -e PUSHER_APP_SECURE="1" pusher-channel-node

The above specifies that we want the service to be available on our machine at localhost:3000. You can test the service works as expected by trying to access http://localhost:3000/generate with a POST request that has the following as it's body {"id": "some random string"}. An example with curl is shown below:

    $ curl -d '{"id" : "3jhbj333"}' -H "Content-Type: application/json" -X POST http://localhost:3000/generate

Building the Golang service

We have decided to build another iteration of our Unique ID generator since the first version written in Node used UUID version 5 and we would prefer to use something much more random. UUID 5 is basically a way of hashing some value into 128 bits. So if you try generating multiple UUIDs with the same id value in your request, you keep on getting the same UUID. While that behavior is easy to change, let's assume we have tons of users and production code relying on that service already, we don’t want to change behavior but wouldn’t mind doing it right again. Hence the version two rewrite that uses UUID 4, which ensures complete randomness.

To start with, you need to create a directory that houses this service. You can go ahead to create one called golang. This directory needs to be created in a directory in accordance to the structure laid out at the beginning of the article .

You will also need to run go mod init in the newly created directory. This will make the project a Golang module.

The first set of actions we need to perform is connecting to Pusher Channels API, so we need a Go SDK. That can be done by the following command:

    $ go get github.com/pusher/pusher-http-go

We need to create a file called registry.go. Since we also have to properly structure the code into packages, it should be in a directory called registry. After which you should paste the following code into the registry.go file.

    // pusher-channel-discovery/golang/registry/registry.go

    package registry

    import (
            "errors"
            "net"

            pusher "github.com/pusher/pusher-http-go"
    )

    type Event string

    const (
            Register Event = "register"
            Exit           = "exit"
    )

    func (e Event) String() string {
            switch e {
            case Register:
                    return string(Register)
            case Exit:
                    return string(Exit)
            default:
                    return ""
            }
    }

    const (
            Channel = "mapped-discovery"
    )

    type Registrar struct {
            pusher *pusher.Client
    }

    type Service struct {
            // The path that is links to this service
            Prefix string `json:"prefix"`

            // Public IP of the host running this service
            Address net.IP `json:"address"`

            Port uint `json:"port"`
    }

    func (s Service) Validate() error {
            if s.Address == nil {
                    return errors.New("addr is nil")
            }

            if s.Port <= 0 {
                    return errors.New("invalid HTTP port")
            }

            return nil
    }

    func New(client *pusher.Client) *Registrar {
            return &Registrar{client}
    }

    func (r *Registrar) do(svc Service, event Event) error {
            if err := svc.Validate(); err != nil {
                    return err
            }

            _, err := r.pusher.Trigger(Channel, event.String(), svc)
            return err

    }

    func (r *Registrar) Register(svc Service) error {
            return r.do(svc, Register)
    }

    func (r *Registrar) DeRegister(svc Service) error {
            return r.do(svc, Exit)
    }

    func (r *Registrar) IP() (net.IP, error) {
            addrs, err := net.InterfaceAddrs()
            if err != nil {
                    return nil, err
            }

            for _, addr := range addrs {
                    if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.IsGlobalUnicast() {
                            if ipnet.IP.To4() != nil || ipnet.IP.To16() != nil {
                                    return ipnet.IP, nil
                            }
                    }
            }

            return nil, nil
    }

That seems to be a lot, so here is a breakdown of what we have done above:

  • Defined multiple data types to conform to the channel registry structure defined above.
  • Implemented a Registrar type that triggers event to a channel. You can find those in the Register and DeRegister methods.

Our main goal is to generate unique IDs, we need to build an HTTP API that will allow for that.

We would be needing an HTTP router to help build our endpoints. For this, we would need a library called chi. To install it, run go get github.com/go-chi/chi. Since we would also be needing to generate unique IDs, it is safe to also install a UUID library. You will need to run go get github.com/google/uuid.

Create a folder called transport/web, and create an http.go file inside the newly created folder. Paste the code below in the http.go file:

    // pusher-channel-discovery/golang/transport/web/http.go
    package web

    import (
            "encoding/json"
            "fmt"
            "net/http"

            "github.com/go-chi/chi"
            "github.com/google/uuid"
    )

    type Server struct {
            HostName string
            Port     uint
    }

    func Start(srv *Server) error {

            mux := chi.NewMux()

            mux.Use(func(next http.Handler) http.Handler {
                    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                            w.Header().Set("Content-Type", "application/json")
                            w.Header().Set("X-Server", srv.HostName)

                            next.ServeHTTP(w, r)
                    })
            })

            mux.Get("/health", func(w http.ResponseWriter, r *http.Request) {
                    defer r.Body.Close()

                    w.WriteHeader(http.StatusOK)
                    w.Write([]byte(`{ 'status' : 'OK' }`))
            })

            mux.Post("/generate", func(w http.ResponseWriter, r *http.Request) {

                    defer r.Body.Close()

                    var response struct {
                            Status int64     `json:"status"`
                            ID     uuid.UUID `json:"id"`
                    }

                    response.Status = 1
                    response.ID = uuid.New()

                    w.WriteHeader(http.StatusOK)
                    json.NewEncoder(w).Encode(&response)
            })

            return http.ListenAndServe(fmt.Sprintf(":%d", srv.Port), mux)
    }

In the code above, we have created two endpoints:

  • /health - This endpoint allows the application to notify others about its internal state.
  • /generate - This endpoint is responsible for creating the UUID. Unlike the first iteration written in NodeJS, it doesn't require any ID of any sort as it generates purely random UUIDs.

To tie up the registry and the HTTP API, we need to make our application able to run as a command line app. To do that, create a file called main.go in the cmd folder of the root application. You need to paste the following code in the main.go file:

    // pusher-channel-discovery/golang/cmd/main.go
    package main

    import (
            "errors"
            "flag"
            "fmt"
            "log"
            "os"
            "os/signal"
            "syscall"

            "github.com/adelowo/pusher-channel-discovery-go/registry"
            "github.com/adelowo/pusher-channel-discovery-go/transport/web"
            pusher "github.com/pusher/pusher-http-go"
    )

    func main() {

            shutDownChan := make(chan os.Signal)
            signal.Notify(shutDownChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

            port := flag.Uint("http.port", 3000, "Port to run HTTP server at")

            flag.Parse()

            appID := os.Getenv("PUSHER_APP_ID")
            appKey := os.Getenv("PUSHER_APP_KEY")
            appSecret := os.Getenv("PUSHER_APP_SECRET")
            appCluster := os.Getenv("PUSHER_APP_CLUSTER")
            appIsSecure := os.Getenv("PUSHER_APP_SECURE")

            var isSecure bool
            if appIsSecure == "1" {
                    isSecure = true
            }

            client := &pusher.Client{
                    AppId:   appID,
                    Key:     appKey,
                    Secret:  appSecret,
                    Cluster: appCluster,
                    Secure:  isSecure,
            }

            reg := registry.New(client)

            ip, err := reg.IP()
            if err != nil {
                    log.Fatalf("could not fetch public IP address... %v", err)
            }

            hostName, err := os.Hostname()
            if err != nil {
                    log.Fatalf("could not fetch host name... %v", err)
            }

            svc := registry.Service{
                    Prefix:  "/v2",
                    Address: ip,
                    Port:    *port,
            }

            if err := reg.Register(svc); err != nil {
                    log.Fatalf("Could not register service... %v", err)
            }

            var errs = make(chan error, 3)

            go func() {
                    srv := &web.Server{
                            HostName: hostName,
                            Port:     *port,
                    }

                    errs <- web.Start(srv)
            }()

            go func() {
                    <-shutDownChan
                    errs <- errors.New("Application is shutting down")
            }()

            fmt.Println(<-errs)
            reg.DeRegister(svc)
    }

Since we need to run multiple copies of this service too, it would make total sense to run it in a container. To build a container image for this service, create a Dockerfile in the root of the project and paste the following code in it:

    ## pusher-channel-discovery/golang/Dockerfile
    FROM golang:1.11 as build-env

    WORKDIR /go/src/github.com/pusher-tutorial/pusher-channel-discovery-go
    ADD . /go/src/github.com/pusher-tutorial/pusher-channel-discovery-go

    ENV GO111MODULE=on

    RUN go mod download
    RUN go mod verify
    RUN go install ./cmd

    ## A better scratch image
    ## See https://github.com/GoogleContainerTools/distroless
    FROM gcr.io/distroless/base
    COPY --from=build-env /go/bin/cmd /
    CMD ["/cmd"]

We need to build this image, so you need to run the following command in your terminal:

    $ export GO111MODULE=on
    $ go mod tidy
    $ docker build -t pusher-channel-go .

This will build the image and make it available to be run later on when we choose. Let's run it now:

    $ docker run -p 127.0.0.1:2000:3000 -e PUSHER_APP_ID="XXXX" -e PUSHER_APP_KEY="XXXXX" -e PUSHER_APP_SECRET="XXXX" -e PUSHER_APP_CLUSTER="eu" -e PUSHER_APP_SECURE="1" pusher-channel-go

You can verify the service is up and running by making a POST request to http://localhost:2000/generate. An example with curl is given below:

    $ curl -X POST localhost:2000/generate

API gateway

An API gateway is an application that provides a single entry point for a selected group of microservices. This plays hand in hand with service registration as it needs to be able to pull up information about those microservices so as to proxy requests to them. This part is called service discovery. You can read more about this pattern here.

We will implement this in NodeJS as we assume it already existed when we had only the first ID generator service. What we are building here is basically a reverse proxy. We subscribe to events published by the services we have built above and include the service in the routing table. Whenever we receive a request, we interpret the URL to determine what service is being requested. We use the prefix property defined above - in Pusher Channels registry - to achieve this.

Still in accordance with the directory structure laid out at the beginning of this tutorial, we need to create a new folder called nodejs-api-gateway. Inside that folder, we need a package.json file to define our dependencies. The following contents should be pasted inside the package.json file:

    // pusher-channels-discovery/nodejs-api-gateway/package.json
    {
      "name": "pusher-channels-api-gateway",
      "version": "1.0.0",
      "description": "",
      "main": "index.js",
      "dependencies": {
        "pusher-js": "^4.3.1",
        "redbird": "^0.8.0"
      }
    }

To install the dependencies declared above, you need to run:

    $ npm install

We then proceed to implement the proxy. We will use a node package called Redbird which is an awesome reverse proxy. You need to create an index.js file and it should have the following code as its contents:

    // pusher-channels-discovery/nodejs-api-gateway/index.js

    const port = process.env.PORT || 1500;
    const domain = process.env.DOMAIN || `localhost:${port}`;

    const proxy = require('redbird')({ port: port });
    const Pusher = require('pusher-js');

    const pusherSocket = new Pusher(process.env.PUSHER_APP_KEY, {
      forceTLS: process.env.PUSHER_APP_SECURE === '1' ? true : false,
      cluster: process.env.PUSHER_APP_CLUSTER,
    });

    const channel = pusherSocket.subscribe('mapped-discovery');

    channel.bind('register', data => {
      proxy.register(
        `${domain}${data.prefix}`,
        `http://${data.address}:${data.port}`
      );
    });

    channel.bind('exit', data => {
      proxy.unregister(
        `${domain}${data.prefix}`,
        `http://${data.address}:${data.port}`
      );
    });

Here is a breakdown of what we have implemented above:

  • We define a domain constant. This defaults to localhost: PORT if the DOMAIN environmental value is not available.
  • We create a connection to Pusher channels and subscribe to the mapped-discovery channel.
  • We then bind a callback the register event. The callback picks out some critical data from the data it has received and uses that to update its routing table.
  • We also do the above for the exit channel. But in the instance, we remove the service from the routing table.

The reverse proxy supports load-balancing to multiple instances of our services as we will see shortly

As with other services we have built, this will also have Docker support. Create a Dockerfile and paste the following contents inside it:

    // pusher-channels-discovery/nodejs-api-gateway/Dockerfile
    FROM node:10
    COPY . ./
    RUN npm install
    CMD ["node", "."]

You also need to build the container image by running the following command in a terminal:

    $ docker build -t pusher-channel-api-gateway .

Putting it all together

You need to run the API gateway first before starting up every other service. State is not persisted and it will only listen to connections from microservices that are started after its last run. To start the API gateway container, run:

    $ docker run -p 127.0.0.1:1500:1500 -e PUSHER_APP_KEY="XXXXX" -e PUSHER_APP_SECURE="1" -e PUSHER_APP_CLUSTER="eu" pusher-channel-api-gateway

You then need to run the container for the service written in NodeJS by running the following command in your terminal:

    $ docker run -e PUSHER_APP_ID="XXXX" -e PUSHER_APP_KEY="XXXXXXX" -e PUSHER_APP_SECRET="XXXXXX" -e PUSHER_APP_CLUSTER="eu" -e PUSHER_APP_SECURE="1" pusher-channel-node

Keep an eye on the terminal window where you are running the API gateway, once the NodeJS service comes up, there should be information in the terminal. It should be in the following form:

    {"name":"redbird","hostname":"88c1cea2c10c","pid":1,"level":30,"from":{"protocol":"http:","slashes":true,"auth":null,"host":"localhost:1500","port":"1500","hostname":"localhost","hash":null,"search":null,"query":null,"pathname":"/v1","path":"/v1","href":"http://localhost:1500/v1"},"to":{"protocol":"http:","slashes":true,"auth":null,"host":"172.17.0.3:3000","port":"3000","hostname":"172.17.0.3","hash":null,"search":null,"query":null,"pathname":"/","path":"/","href":"http://172.17.0.3:3000/","useTargetHostHeader":false},"msg":"Registered a new route","time":"2018-11-11T20:08:02.632Z","v":0}

Shutting down the NodeJS service should produce something similar but with an Unregistered a route message.

You can access the Node service by sending a request to the API gateway, http://localhost:1500/v1/generate as previously shown. Why the v1? If you look at the part of the code that sends information to Pusher Channels, you will notice it has /v1 in its prefix property. If you take a look at the Golang implementation, you will find its prefix property with /v2, not /v1. What this means is all requests sent to the API gateway that starts from v1 would be proxied to the NodeJS service while those with v2 would be proxied to the Golang service. To run the Golang service, we need to run the following command:

    $ docker run -p 2000:3000 -e PUSHER_APP_ID="XXXX" -e PUSHER_APP_KEY="XXXXX" -e PUSHER_APP_SECRET="XXXX" -e PUSHER_APP_CLUSTER="eu" -e PUSHER_APP_SECURE="1" pusher-channel-go

All that is left for us to do now is to test that our requests are being proxied to the right services. I will be showing examples with curl.

    $ # for v1 
    $ curl -i -d '{"id" : "3jhbj333"}' -H "Content-Type: application/json" -X POST http://localhost:1500/v1/generate
    $ # for v2
    $ curl -i -X POST localhost:1500/v2/generate

I have included the -i option as it will be useful for us to inspect the X-Server headers to also validate the request is being served by the right service. It will also be useful when we need to validate that requests are being proxied among multiple instances of a service. You need to run multiple instances of the NodeJS service, just open two or three terminal windows where you run the above docker command in each of them.

Once done, you should make requests to the API gateway and watch the value of X-Server change as requests are proxied in a round robin manner to all available instances.

    $ curl -i -d '{"id" : "3jhbj333"}' -H "Content-Type: application/json" -X POST http://localhost:1500/v1/generate

You can also try shutting down one or more instances of the available services and see what happens. Spoiler, requests are no longer proxied to them.

Conclusion

In this tutorial, we have leveraged Pusher Channels to implement service discovery and registration when dealing with microservices.

You can find the source code to this tutorial on GitHub.

Clone the project repository
  • Go
  • JavaScript
  • Node.js
  • Channels

Products

  • Channels
  • Chatkit
  • Beams

© 2019 Pusher Ltd. All rights reserved.

Pusher Limited is a company registered in England and Wales (No. 07489873) whose registered office is at 160 Old Street, London, EC1V 9BW.