Getting data in realtime from MySQL with Java

Introduction

Applications are more complex than ever before.

And it’s not only about the increasing number of users that must be handled or reducing response times.

If your application has a database, probably you’ll need its data in other places as soon as it goes in.

In this context, change data capture is the approach you use to capture and deliver the changes in the database to other sources.

In this tutorial, you’re going to learn how to stream, in realtime, the changes made to a table in a MySQL database to a React app. Something like this:

java-mysql-react-demo

Prerequisites

Here’s what you need to have installed to follow this tutorial:

You’ll need to have knowledge of:

  • Java programming (intermediate level)
  • Basic MySQL management tasks
  • React (beginner level)

If you want to track and determine if something in a database has changed, you have three main approaches:

  • Poll the database every X seconds and determine if something has changed using a timestamp, version number or status field.
  • Use database or application-level triggers to execute a piece of code when something changes.
  • Use the database transaction/replication log, which records every change to the database.

I’m going to use the third approach because I think is the most robust. It doesn’t waste resources (like polling) or hurt performance (like triggers).

We’ll read the database changes from the MySQL replication log using the library mysql-binlog-connector-java. Then, we’ll parse the event to extract the relevant data and publish it to a Pusher channel so it can be consumed by a React application.

Here’s the diagram that describes the above process:

java-mysql-db-diagram

For reference, here is a GitHub repository with all the code shown in this tutorial and instructions to run it.

Let’s start by creating a Pusher application.

Creating a Pusher application

To get started with Pusher, create your Pusher account or sign in. Then, go to your dashboard and create a Channels app, choosing a name, the cluster closest to your location, and optionally, React as the frontend tech and Java as the backend tech:

java-mysql-db-pusher-app

This will give you some sample code to get started:

java-mysql-db-pusher-app-keys

Save your app ID, key, secret and cluster values. We’ll need them later.

Configuring MySQL replication

The first thing you need to do is enable replication in MySQL.

Replication allows data from one MySQL server (the master) to be copied in an asynchronous way to one or more different MySQL servers (the slaves).

It works by writing all the changes in the master to a binary log file that then is synchronized between master and slaves, so these can apply all those changes.

For this tutorial, you don’t have to set up slave servers. We’re only interested in the binary log.

In the MySQL configuration file (usually at /etc/my.cnf or C:\ProgramData\MySQL\MySQL Server 5.7\my.ini), add the following lines:

1[mysqld]
2    server-id = 1 #1
3    log_bin = /var/log/mysql/mysql-bin.log #2
4    expire_logs_days = 10 #3
5    max_binlog_size = 100M #4
6    binlog-format = row #5

Line #1 assigns an identifier to the server.

Line #2 specifies the directory where the logs will be stored. In Windows, it will be something like c:/logs/mysql-bin.log. In Linux, make sure this directory has the necessary permissions for MySQL.

Line #3 and #4 are optional, they specify the expiration time and maximum size of the file.

Line #5 is important, it specifies the format in which the log will be written.

There are two main types of replication formats:

  • Statement Based Replication (SBR), which replicates entire SQL statements, and
  • Row Based Replication (RBR), which replicates only the changed rows.

For our purposes, RBR will be easier to work with. That’s why the file specifies this format.

Now restart the server.

In a terminal window, connect to the MySQL server using mysql:

    mysql -u <YOUR_USER> -p 

Now choose or create a database and create the table that is going to be used by the application:

1USE myDatabase
2    CREATE TABLE products(id int(11) not null auto_increment, name varchar(50) default null, price decimal(6,2), primary key (id));

It’s not recommended to work with a user with administrative privileges like root so let’s create another user for the application:

    CREATE USER '<YOUR_USER>'@'<YOUR_HOST>' IDENTIFIED BY '<YOUR_PASSWORD>';

Give it replication and table privileges:

1GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '<YOUR_USER>'@'<YOUR_HOST>';
2    GRANT ALL PRIVILEGES ON `<INSERT_YOUR_DB_NAME>`.* TO '<YOUR_USER>'@'<YOUR_HOST>';
3    FLUSH PRIVILEGES;

Now execute the following command to check if replication is enabled:

    show master status;

It should show something like the following:

1+------------------+----------+--------------+------------------+-------------------+
2    | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
3    +------------------+----------+--------------+------------------+-------------------+
4    | mysql-bin.000001 |      001 |              |                  |                   |
5    +------------------+----------+--------------+------------------+-------------------+

It indicates the current log file and the position of the last statement.

If you’re getting <Empty set> or something like that, execute:

    show variables like "%log_bin%";

If replication is enabled, you should see something like the this:

1+---------------------------------+--------------------------------+
2    | Variable_name                   | Value                          |
3    +---------------------------------+--------------------------------+
4    | log_bin                         | ON                             |
5    | log_bin_basename                | /var/log/mysql/mysql-bin       |
6    | log_bin_index                   | /var/log/mysql/mysql-bin.index |
7    | log_bin_trust_function_creators | OFF                            |
8    | log_bin_use_v1_row_events       | OFF                            |
9    | sql_log_bin                     | ON                             |
10    +---------------------------------+--------------------------------+

Otherwise double check your configuration. You can learn more about replication here.

Now let’s create the Java program that will read the binary log.

Reading MySQL’s binary log

It turns out that reading binary logs for change data capture is more common than you think.

Microsoft SQL Server has built-in support for change data capture.

Oracle offers GoldenGate for real-time data integration and replication.

MongoDB offers Change Streams to access real-time data changes.

For MySQL, there a lot of libraries for reading the binary log and stream changes as events to other sources. In this wiki, you can find many of these libraries.

Most of these libraries were made for enterprise system so they work natively with Apache Kafka, a publish and subscribe distributed platform that streams event and records to multiple sources.

But if you don’t need something like that, you can use mysql-binlog-connector-java, which allows you to read the binary log file and listen for changes as events from any Java program.

So open your favorite IDE and create a Maven project.

Or just create a directory structure like the following:

1src
2     |- main
3       |- java
4     |- pom.xml

In the pom.xml file specify the project information, java version, and mysql-binlog-connector-java and pusher-http-java as dependencies:

1<?xml version="1.0" encoding="UTF-8"?>
2    <project xmlns="http://maven.apache.org/POM/4.0.0"
3             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5        <modelVersion>4.0.0</modelVersion>
6    
7        <groupId>com.example</groupId>
8        <artifactId>MySQLRealtime</artifactId>
9        <version>1.0-SNAPSHOT</version>
10    
11        <properties>
12            <maven.compiler.source>1.8</maven.compiler.source>
13            <maven.compiler.target>1.8</maven.compiler.target>
14        </properties>
15    
16        <dependencies>
17            <dependency>
18                <groupId>com.github.shyiko</groupId>
19                <artifactId>mysql-binlog-connector-java</artifactId>
20                <version>0.16.1</version>
21            </dependency>
22    
23            <dependency>
24                <groupId>com.pusher</groupId>
25                <artifactId>pusher-http-java</artifactId>
26                <version>1.0.0</version>
27            </dependency>
28        </dependencies>
29    
30    </project>

Now create a class, let’s say scr/ReadLog.java, with the code to connect to MySQL and listen for log events:

1public class ReadLog {
2        public static void main(String[] args) throws IOException {
3            BinaryLogClient client =
4              new BinaryLogClient("localhost", 3306, "<MYSQL_USER>", "<MYSQL_PASSWROD>");
5    
6            client.registerEventListener(event -> {
7                System.out.println(event);
8            });
9            client.connect();
10        }
11    }

If you execute this class, the program will block until an event is received from the log.

For example, this is an example of the events you receive when a database is created:

1Event{header=EventHeaderV4{timestamp=1524607461000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=463, flags=0}, data=null}
2    
3    Event{header=EventHeaderV4{timestamp=1524607461000, eventType=QUERY, serverId=1, headerLength=19, dataLength=75, nextPosition=557, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='CREATE DATABASE test'}}

You receive an event for the creation of the global transaction identifier (GTID) and the actual query (CREATE DATABASE test).

Here’s an example of the events you receive when a table is created:

1Event{header=EventHeaderV4{timestamp=1524609716000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1696, flags=0}, data=null}
2    
3    Event{header=EventHeaderV4{timestamp=1524609716000, eventType=QUERY, serverId=1, headerLength=19, dataLength=181, nextPosition=1896, flags=0}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='create table products(id int(11) not null auto_increment, name varchar(50) default null, price decimal(6,2), primary key (id))'}}

When you insert a record:

1Event{header=EventHeaderV4{timestamp=1524609804000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1961, flags=0}, data=null}
2    
3    Event{header=EventHeaderV4{timestamp=1524609804000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=2033, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='BEGIN'}}
4    
5    Event{header=EventHeaderV4{timestamp=1524609804000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=38, nextPosition=2090, flags=0}, data=TableMapEventData{tableId=109, database='test', table='products', columnTypes=3, 15, -10, columnMetadata=0, 50, 518, columnNullability={1, 2}}}
6    
7    Event{header=EventHeaderV4{timestamp=1524609804000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=2140, flags=0}, data=WriteRowsEventData{tableId=109, includedColumns={0, 1, 2}, rows=[
8        [1, laptop, 999.99]
9    ]}}
10    
11    Event{header=EventHeaderV4{timestamp=1524609804000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=2171, flags=0}, data=XidEventData{xid=28}}

When you update a record:

1Event{header=EventHeaderV4{timestamp=1524609897000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=2236, flags=0}, data=null}
2    
3    Event{header=EventHeaderV4{timestamp=1524609897000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=2308, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='BEGIN'}}
4    
5    Event{header=EventHeaderV4{timestamp=1524609897000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=38, nextPosition=2365, flags=0}, data=TableMapEventData{tableId=109, database='test', table='products', columnTypes=3, 15, -10, columnMetadata=0, 50, 518, columnNullability={1, 2}}}
6    
7    Event{header=EventHeaderV4{timestamp=1524609897000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=47, nextPosition=2431, flags=0}, data=UpdateRowsEventData{tableId=109, includedColumnsBeforeUpdate={0, 1, 2}, includedColumns={0, 1, 2}, rows=[
8        {before=[1, laptop, 999.99], after=[1, laptop, 100.01]}
9    ]}}
10    
11    Event{header=EventHeaderV4{timestamp=1524609897000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=2462, flags=0}, data=XidEventData{xid=29}}

When you delete two records:

1Event{header=EventHeaderV4{timestamp=1524610005000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=2805, flags=0}, data=null}
2    
3    Event{header=EventHeaderV4{timestamp=1524610005000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=2877, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='BEGIN'}}
4    
5    Event{header=EventHeaderV4{timestamp=1524610005000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=38, nextPosition=2934, flags=0}, data=TableMapEventData{tableId=109, database='test', table='products', columnTypes=3, 15, -10, columnMetadata=0, 50, 518, columnNullability={1, 2}}}
6    
7    Event{header=EventHeaderV4{timestamp=1524610005000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=49, nextPosition=3002, flags=0}, data=DeleteRowsEventData{tableId=109, includedColumns={0, 1, 2}, rows=[
8        [1, laptop, 100.01],
9        [2, laptop v2, 999.99]
10    ]}}
11    
12    Event{header=EventHeaderV4{timestamp=1524610005000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=3033, flags=0}, data=XidEventData{xid=31}}

This way, you can see that data manipulation (DML) statements are mapped this way:

  • Insert statements have the event type EXT_WRITE_ROWS and you can find the information of the insertion in a class of type WriteRowsEventData.
  • Insert statements have the event type EXT_UPDATE_ROWS and you can find the information of the update in a class of type UpdateRowsEventData.
  • Insert statements have the event type EXT_DELETE_ROWS and you can find the information of the deletion in a class of type DeleteRowsEventData.

In addition, all of these events are preceded by a TABLE_MAP event with information about the table and columns that are being modified.

So we need to listen for these events.

The only problem is that if you need to keep track of the changes of many tables in a separate way, you cannot rely on the tableId field because this ID may change between executions.

You can change the way events are deserialized but maybe a simpler approach would be to keep track of the table names and IDs in a map.

Taking into account this, you can modify the program in this way:

1public class ReadLog {
2        public static void main(String[] args) throws IOException {
3            final Map<String, Long> tableMap = new HashMap<String, Long>();
4            BinaryLogClient client =
5              new BinaryLogClient("localhost", 3306, "<MYSQL_USER>", "<MYSQL_PASSWROD>");
6    
7            client.registerEventListener(event -> {
8                EventData data = event.getData();
9    
10                if(data instanceof TableMapEventData) {
11                    TableMapEventData tableData = (TableMapEventData)data;
12                    tableMap.put(tableData.getTable(), tableData.getTableId());
13                }
14            });
15            client.connect();
16        }
17    }

Notice how the program checks the subtype of EventData to get the information.

Now, let’s add the Pusher object with the information you got when you created the app:

1public class ReadLog {
2        public static void main(String[] args) throws IOException {
3            final Map<String, Long> tableMap = new HashMap<String, Long>();
4            
5            Pusher pusher = 
6                new Pusher("<PUSHER_APP_ID>", "<PUSHER_APP_KEY>", "<PUSHER_APP_SECRET>");
7            pusher.setCluster("<PUSHER_APP_CLUSTER>");
8            pusher.setEncrypted(true);
9            
10            // ...
11        }
12    }

And you can check if the event is an insert, update or delete, you can check if it corresponds to the product table, extract the product information and publish it as a map to a product channel.

Here’s the code for INSERT events:

1public class ReadLog {
2        public static void main(String[] args) throws IOException {
3            // ...
4    
5            client.registerEventListener(event -> {
6                EventData data = event.getData();
7    
8                if(data instanceof TableMapEventData) {
9                    // ...
10                } else if(data instanceof WriteRowsEventData) {
11                    WriteRowsEventData eventData = (WriteRowsEventData)data;
12                    if(eventData.getTableId() == tableMap.get(PRODUCT_TABLE_NAME)) {
13                        for(Object[] product: eventData.getRows()) {
14                            pusher.trigger(
15                               PRODUCT_TABLE_NAME, "insert", getProductMap(product)
16                            );
17                        }
18                    }
19                }
20            });
21            client.connect();
22        }
23    
24        static Map<String, String> getProductMap(Object[] product) {
25            Map<String, String> map = new HashMap<>();
26            map.put("id", java.lang.String.valueOf(product[0]));
27            map.put("name", java.lang.String.valueOf(product[1]));
28            map.put("price", java.lang.String.valueOf(product[2]));
29    
30            return map;
31        }
32    }

For the update event, only the after data is needed. The before and after fields are formatted as a map entry, where after is the value part of this structure:

1public class ReadLog {
2        public static void main(String[] args) throws IOException {
3            // ...
4    
5            client.registerEventListener(event -> {
6                EventData data = event.getData();
7    
8                if(data instanceof TableMapEventData) {
9                    // ...
10                } else if(data instanceof WriteRowsEventData) {
11                    // ...
12                } else if(data instanceof UpdateRowsEventData) {
13                    UpdateRowsEventData eventData = (UpdateRowsEventData)data;
14                    if(eventData.getTableId() == tableMap.get(PRODUCT_TABLE_NAME)) {
15                        for(Map.Entry<Serializable[], Serializable[]> row : 
16                                                          eventData.getRows()) {
17                            pusher.trigger(
18                              PRODUCT_TABLE_NAME, "update", getProductMap(row.getValue())
19                            );
20                        }
21                    }
22                }
23            });
24            client.connect();
25        }
26    
27        // ...
28    }

For the delete event, you’ll only need the ID of the deleted record:

1public class ReadLog {
2        public static void main(String[] args) throws IOException {
3            // ...
4    
5            client.registerEventListener(event -> {
6                EventData data = event.getData();
7    
8                if(data instanceof TableMapEventData) {
9                    // ...
10                } else if(data instanceof WriteRowsEventData) {
11                    // ...
12                } else if(data instanceof UpdateRowsEventData) {
13                    // ...
14                } else if(data instanceof DeleteRowsEventData) {
15                    DeleteRowsEventData eventData = (DeleteRowsEventData)data;
16                    if(eventData.getTableId() == tableMap.get(PRODUCT_TABLE_NAME)) {
17                        for(Object[] product: eventData.getRows()) {
18                            pusher.trigger(PRODUCT_TABLE_NAME, "delete", product[0]);
19                        }
20                    }
21                }
22            });
23            client.connect();
24        }
25    
26        // ...
27    }

Now, any application listening for the product channel will get the information about the database changes.

Let’s build a React client to show this.

Building the React application

Let’s use create-react-app to bootstrap a React app.

Execute the following command in a terminal window to create a new app:

    npx create-react-app my-app

Now go into the app directory and install the Pusher dependency with npm:

1cd my-app
2    npm install --save pusher-js

Open the file src/App.css and add the following CSS styles:

1.table {
2      border: 2px solid #FFFFFF;
3      width: 100%;
4      text-align: center;
5      border-collapse: collapse;
6    }
7    .table td, .table th {
8      border: 1px solid #FFFFFF;
9      padding: 3px 4px;
10    }
11    .table tbody td {
12      font-size: 13px;
13    }
14    .table thead {
15      background: #FFFFFF;
16      border-bottom: 4px solid #333333;
17    }
18    .table thead th {
19      font-size: 15px;
20      font-weight: bold;
21      color: #333333;
22      text-align: center;
23      border-left: 2px solid #333333;
24    }
25    .table thead th:first-child {
26      border-left: none;
27    }

Now let’s create a new component, src/Table.js, to show the product information (received as a property) in a table:

1import React, { Component } from 'react';
2    import './App.css';
3    
4    export default class Table extends Component {
5        render() {
6          const rowsMapped =this.props.rows.map(row => (
7                <tr key={row.id}>
8                  <td>{row.id}</td>
9                  <td>{row.name}</td>
10                   <td>{row.price}</td>
11                </tr>
12              ));
13          
14          return (
15            <table className="table">
16              <thead>
17                <tr>
18                  <th>ID</th>
19                  <th>Name</th>
20                  <th>Price</th>
21                </tr>
22              </thead>
23              <tbody>
24                {rowsMapped}
25              </tbody>
26            </table>
27          );
28        }
29    }

Now modify the file src/App.js to import this component and Pusher:

1import React, { Component } from 'react';
2    import logo from './logo.svg';
3    import './App.css';
4    
5    import Table from './Table.js';
6    
7    import Pusher from 'pusher-js';
8    
9    class App extends Component {
10      // ...
11    }

Let’s have the array of rows as the state of this component, and while we are at the constructor, let’s bind the functions we are going to use to insert, update and delete items:

1// ...
2    
3    class App extends Component {
4      constructor(props) {
5        super(props);
6        this.state = {rows: []};
7        
8        this.insert = this.insert.bind(this);
9        this.update = this.update.bind(this);
10        this.delete = this.delete.bind(this);
11      }
12    }

In the componentDidMount method, let’s configure the Pusher object subscribe to the channel to get the events:

1// ...
2    
3    class App extends Component {
4      constructor(props) {
5        // ...
6      }
7      
8      componentDidMount() {
9        this.pusher = new Pusher('<PUSHER_APP_KEY>', {
10              cluster: '<PUSHER_APP_CLUSTER>',
11          encrypted: true,
12        });
13        this.channel = this.pusher.subscribe('products');
14            
15        this.channel.bind('insert', this.insert);
16        this.channel.bind('update', this.update);
17        this.channel.bind('delete', this.delete);
18      }
19    }

These are the functions to insert, update and delete items from this.state.rows:

1// ...
2    
3    class App extends Component {
4      // ...
5      insert(data) {
6        this.setState(prevState => ({
7          rows: [ data, ...prevState.rows ]
8        }));
9      }
10    
11      update(data) {
12        this.setState(prevState => ({
13          rows: prevState.rows.map(el => 
14                  el.id === data.id ? data : el
15          )
16        }));
17      }
18    
19      delete(id) {
20        this.setState(prevState => ({
21          rows: prevState.rows.filter(el => el.id !== String(id))
22        }));
23      }
24    }

Finally, the render function will look like this:

1// ...
2    
3    class App extends Component {
4      // ...
5      render() {
6        return (
7          <div className="App">
8            <header className="App-header">
9              <img src={logo} className="App-logo" alt="logo" />
10              <h1 className="App-title">Welcome to React</h1>
11            </header>
12            <Table rows={this.state.rows} />
13          </div>
14        );
15      }
16    }

And that’s it.

Let’s test the application.

Testing the application

Make sure the MySQL server is running with replication enabled.

If you’re working with an IDE, run the class ReadLog.

Otherwise, you can add this property to the pom.xmlfile:

1<properties>
2      ...
3      <exec.mainClass>ReadLog</exec.mainClass>
4    </properties>

And execute this command to run the app:

    mvn exec:java

For the React app, inside the app directory, execute:

    npm start

A browser window will open http://localhost:3000/, and from there, you can connect to the database with the mysql client and insert, update or delete records in the product table:

java-mysql-react-demo

Conclusion

In this tutorial, you have learned how to turn MySQL into a realtime database by using the replication log to publish the changes made to a database using Pusher.

You used mysql-binlog-connector-java to get the insert, update and delete events from the log. However, at the time of this writing, the current version of MySQL (MySQL 8.0.11) is not yet supported.

But there are other options. As mentioned before, in this wiki you can find more libraries to work with MySQL binary log.

In this blog post, you can find another way to extract data from MySQL using Alibaba’s open sourced Canal project.

The applications that this tutorial present are simple but they show how this change data capture using transaction logs work.

They can be extended in many ways:

  • Support for more tables
  • Detect when the Java application goes down and have to be restarted
  • Read from the log to start at a given position
  • Change the React implementation to support a bigger table in an efficient way

Remember that all the source code for this applications is available on Github.