Build a live popularity chart in Go using tweets as a data source

Introduction

Polls exists almost everywhere on the internet - Twitter, Slack - and a major similarity between all of them is the results are updated in realtime. In this tutorial, I will be describing how to build a web app that shows the popularity of a keyword in realtime with the help of Pusher Channels. The data source for our application will be tweets from Twitter.

Below is a gif of the final state of the application:

go-twitter-graph-demo

Prerequisites

To do this, you need to apply as a developer before you can create an application. You can find a comprehensive guide here.

Building the application

Remember that an important step to this is to make sure you have a Twitter developer account. Kindly follow this tutorial to do that.

The next step of action is to create a directory to house our application, you will need to create a directory called streaming-api. The location of this directory will depend on the version of the Go toolchain you have - If your Go toolchain is <=1.11, you need to create the directory in your $GOPATH such as $GOPATH/src/github.com/username/streaming-api. If you are making use of >=1.12, you can create the directory literally anywhere.

Once that is done, you will need to create a file called .env, this file will contain credentials to access both the Twitter streaming API and Pusher Channels. Run the command below to create the file:

    $ touch .env

Once done, you will also need to paste the following contents into the newly created .env file:

1// .env
2    TWITTER_CONSUMER_KEY=TWITTER_CONSUMER_KEY
3    TWITTER_CONSUMER_SECRET=TWITTER_CONSUMER_SECRET
4    TWITTER_ACCESS_TOKEN=TWITTER_ACCESS_TOKEN
5    TWITTER_ACCESS_SECRET=TWITTER_ACCESS_SECRET
6    PUSHER_APP_ID=PUSHER_APP_ID
7    PUSHER_APP_KEY=PUSHER_APP_KEY
8    PUSHER_APP_SECRET=PUSHER_APP_SECRET
9    PUSHER_APP_CLUSTER="eu"
10    PUSHER_APP_SECURE="1"

Please remember to replace the placeholders with your actual credentials.

The next step of action is to actually create the server and the integration with Pusher Channels. To do that, you need to create a new file called main.go, that can be done by executing the command below:

    $ touch main.go

You will also need to fetch some library that are needed to help build the application. Run the command below to install these libraries:

1$ go get -v github.com/dghubble/go-twitter/twitter 
2    $ go get -v github.com/dghubble/oauth1 
3    $ go get -v github.com/joho/godotenv
4    $ go get -v github.com/pusher/pusher-http-go

In the newly created file main.go, you will need to paste the following contents:

1// streaming-api/main.go
2    
3    package main
4    
5    import (
6            "encoding/json"
7            "flag"
8            "fmt"
9            "html/template"
10            "log"
11            "net/http"
12            "os"
13            "os/signal"
14            "strings"
15            "sync"
16            "syscall"
17            "time"
18    
19            "github.com/dghubble/go-twitter/twitter"
20            "github.com/dghubble/oauth1"
21            "github.com/joho/godotenv"
22            "github.com/pusher/pusher-http-go"
23    )
24    
25    type cache struct {
26            counter map[string]int64
27            mu      sync.RWMutex
28    }
29    
30    func (c *cache) Init(options ...string) {
31            for _, v := range options {
32                    c.counter[strings.TrimSpace(v)] = 0
33            }
34    }
35    
36    func (c *cache) All() map[string]int64 {
37            c.mu.Lock()
38            defer c.mu.Unlock()
39    
40            return c.counter
41    }
42    
43    func (c *cache) Incr(option string) {
44            c.mu.Lock()
45            defer c.mu.Unlock()
46    
47            c.counter[strings.TrimSpace(option)]++
48    }
49    
50    func (c *cache) Count(option string) int64 {
51            c.mu.RLock()
52            defer c.mu.RUnlock()
53    
54            val, ok := c.counter[strings.TrimSpace(option)]
55            if !ok {
56                    return 0
57            }
58    
59            return val
60    }
61    
62    func main() {
63    
64            options := flag.String("options", "Messi,Suarez,Trump", "What items to search for on Twitter ?")
65            httpPort := flag.Int("http.port", 1500, "What port to run HTTP on ?")
66            channelsPublishInterval := flag.Duration("channels.duration", 3*time.Second, "How much duration before data is published to Pusher Channels")
67    
68            flag.Parse()
69    
70            if err := godotenv.Load(); err != nil {
71                    log.Fatalf("could not load .env file.. %v", err)
72            }
73    
74            appID := os.Getenv("PUSHER_APP_ID")
75            appKey := os.Getenv("PUSHER_APP_KEY")
76            appSecret := os.Getenv("PUSHER_APP_SECRET")
77            appCluster := os.Getenv("PUSHER_APP_CLUSTER")
78            appIsSecure := os.Getenv("PUSHER_APP_SECURE")
79    
80            var isSecure bool
81            if appIsSecure == "1" {
82                    isSecure = true
83            }
84    
85            pusherClient := &pusher.Client{
86                    AppId:   appID,
87                    Key:     appKey,
88                    Secret:  appSecret,
89                    Cluster: appCluster,
90                    Secure:  isSecure,
91            }
92    
93            config := oauth1.NewConfig(os.Getenv("TWITTER_CONSUMER_KEY"), os.Getenv("TWITTER_CONSUMER_SECRET"))
94            token := oauth1.NewToken(os.Getenv("TWITTER_ACCESS_TOKEN"), os.Getenv("TWITTER_ACCESS_SECRET"))
95    
96            httpClient := config.Client(oauth1.NoContext, token)
97    
98            client := twitter.NewClient(httpClient)
99    
100            optionsCache := &cache{
101                    mu:      sync.RWMutex{},
102                    counter: make(map[string]int64),
103            }
104    
105            splittedOptions := strings.Split(*options, ",")
106    
107            if n := len(splittedOptions); n < 2 {
108                    log.Fatalf("There must be at least 2 options... %v ", splittedOptions)
109            } else if n > 3 {
110                    log.Fatalf("There cannot be more than 3 options... %v", splittedOptions)
111            }
112    
113            optionsCache.Init(splittedOptions...)
114    
115            go func() {
116    
117                    var t *template.Template
118                    var once sync.Once
119    
120                    http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("."))))
121    
122                    http.Handle("/polls", http.HandlerFunc(poll(optionsCache)))
123                    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
124    
125                            once.Do(func() {
126                                    tem, err := template.ParseFiles("index.html")
127                                    if err != nil {
128                                            log.Fatal(err)
129                                    }
130    
131                                    t = tem.Lookup("index.html")
132                            })
133    
134                            t.Execute(w, nil)
135                    })
136    
137                    http.ListenAndServe(fmt.Sprintf(":%d", *httpPort), nil)
138            }()
139    
140            go func(c *cache, client *pusher.Client) {
141    
142                    t := time.NewTicker(*channelsPublishInterval)
143    
144                    for {
145                            select {
146                            case <-t.C:
147                                    pusherClient.Trigger("twitter-votes", "options", c.All())
148                            }
149                    }
150    
151            }(optionsCache, pusherClient)
152    
153            demux := twitter.NewSwitchDemux()
154            demux.Tweet = func(tweet *twitter.Tweet) {
155                    for _, v := range splittedOptions {
156                            if strings.Contains(tweet.Text, v) {
157                                    optionsCache.Incr(v)
158                            }
159                    }
160            }
161    
162            fmt.Println("Starting Stream...")
163    
164            filterParams := &twitter.StreamFilterParams{
165                    Track:         splittedOptions,
166                    StallWarnings: twitter.Bool(true),
167            }
168    
169            stream, err := client.Streams.Filter(filterParams)
170            if err != nil {
171                    log.Fatal(err)
172            }
173    
174            go demux.HandleChan(stream.Messages)
175    
176            ch := make(chan os.Signal)
177            signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
178            <-ch
179    
180            fmt.Println("Stopping Stream...")
181            stream.Stop()
182    }
183    
184    func poll(cache *cache) func(w http.ResponseWriter, r *http.Request) {
185            return func(w http.ResponseWriter, r *http.Request) {
186                    json.NewEncoder(w).Encode(cache.All())
187            }
188    }

While a little lengthy, the above code does just three things:

  • Connect to the Twitter streaming API and listen for tweets that match our options search.
  • Start an HTTP server that serves an HTML page in order to display the realtime results.
  • Send an updated result to Pusher Channels.

While you might be tempted to run the application, there are still a few things missing here. We need to create one more file - index.html. This file will house the frontend for our application. You will need to go ahead to create the file by running the command below:

    $ touch index.html

In the newly created index.html file, you will need to paste the following contents in it:

1// streaming-api/index.html
2    <!DOCTYPE html>
3    <html lang="en">
4    
5    <head>
6        <meta charset="UTF-8">
7        <meta name="viewport" content="width=device-width, initial-scale=1.0">
8        <meta http-equiv="X-UA-Compatible" content="ie=edge">
9        <title>Realtime voting app based on Tweets</title>
10        <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/twitter-bootstrap/4.3.1/css/bootstrap-grid.min.css">
11        <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.8.0/Chart.min.css" />
12    </head>
13    
14    <body>
15        <div class="container">
16            <div class="row">
17                <div class="col-md-1">
18                </div>
19                <div class="col-md-10">
20                    <canvas id="myChart" width="400" height="400"></canvas>
21                </div>
22                <div class="col-md-1">
23                </div>
24            </div>
25        </div>
26        <script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.8.0/Chart.min.js"></script>
27        <script src="https://cdnjs.cloudflare.com/ajax/libs/axios/0.18.0/axios.min.js"></script>
28    <script src="https://js.pusher.com/4.4/pusher.min.js"></script>
29    <script src="static/app.js"></script>
30    </body>
31    </html>

We import a few Javascript libraries but perhaps the most interesting is Line 29 which reads <script src="static/app.js"></script> . Basically, what this means is we need to create yet another file called app.js. You can go ahead to do that in the root directory with the following command:

    $ touch app.js

In the newly created app.js file, paste the following content:

1// streaming-api/app.js
2    
3    const APP_KEY = 'PUSHER_APP_KEY';
4    const APP_CLUSTER = 'PUSHER_APP_CLUSTER';
5    
6    var ctx = document.getElementById('myChart').getContext('2d');
7    var myChart = new Chart(ctx, {
8      type: 'bar',
9      data: {
10        labels: [],
11        datasets: [
12          {
13            label: '# of Tweets',
14            data: [],
15            backgroundColor: [
16              'rgba(255, 99, 132, 0.2)',
17              'rgba(54, 162, 235, 0.2)',
18              'rgba(255, 159, 64, 0.2)',
19            ],
20            borderWidth: 1,
21          },
22        ],
23      },
24      options: {
25        scales: {
26          yAxes: [
27            {
28              ticks: {
29                beginAtZero: true,
30              },
31            },
32          ],
33        },
34      },
35    });
36    
37    function updateChart(data) {
38      let iterationCount = 0;
39    
40      for (const key in data) {
41        if (!myChart.data.labels.includes(key)) {
42          myChart.data.labels.push(key);
43        }
44    
45        myChart.data.datasets.forEach(dataset => {
46          dataset.data[iterationCount] = data[key];
47        });
48    
49        iterationCount++;
50    
51        myChart.update();
52      }
53    }
54    
55    axios
56      .get('http://localhost:1500/polls', {})
57      .then(res => {
58        updateChart(res.data);
59      })
60      .catch(err => {
61        console.log('Could not retrieve information from the backend');
62        console.error(err);
63      });
64    
65    const pusher = new Pusher(APP_KEY, {
66      cluster: APP_CLUSTER,
67    });
68    
69    const channel = pusher.subscribe('twitter-votes');
70    
71    channel.bind('options', data => {
72      updateChart(data);
73    });

Please remember to make use of your actual key.

With the above done, it is time to test the application. To do this, you should run the following command in the root directory of streaming-api :

    $ go run main.go

You will need to visit http://localhost:1500 to see the chart.

You can also make use of the trending topics on your Twitter if you want to. To search Twitter for other polls, you can also make use of the following command:

    $ go run main.go -options="Apple,Javascript,Trump"

Conclusion

In this tutorial, I have described how to build a realtime popularity application that uses tweets as a data source. I also showed how to integrate with the Twitter streaming API and more importantly, Pusher Channels.

As always, the code for this tutorial can be found on GitHub.