Skip to main content

Leaders and Followers

Implement Data Replication in Go - Part II

In this chapter, we will implement a Single Leader Data Replication technique in Golang.

Summary

In the last chapter, we set up two docker containers running a docker image of postgres on different ports of your local machine.

In this chapter, we will implement the Data Replication technique in Golang with one masterdb and one replicadb which we have already setup.

💡
Note: We can use more number of followers/replicas but for the simplicity we are using single follower/replica.

Prerequisites

  • Go - You should have Go installed on your machine.
  • Code Editor (VS Code or Goland)
  • Postman

We will have the below directory structure to implement the whole scenario.

data-replication-go
├── database
│   ├── database.go
├── model
│   ├── employee.go
├── service
│   ├── data-replication.go
├── util
│   ├── cron.go
├── go.mod
│   ├── go.sum
├── main.go
├── .gitignore

An idea around what we are building

The idea is that we will be connecting our Go application with both databases i.e. masterdb and replicadb that we created in the previous chapter.

We will have two APIs.

  • GET: this will take care of reading data from the replica database.
  • POST: this will take care of writing data to the master database.

GET Request

URL: http://localhost:8080/employees

GET Response

[
    {
        "id": 101,
        "name": "John",
        "salary": 285000
    },
    {
        "id": 102,
        "name": "Sam",
        "salary": 255000
    }
]

POST Request

URL: http://localhost:8080/employees

Request Body:

{
    "id": 103,
    "name": "John",
    "salary": 135000
}

POST Response

{
    "id": 103,
    "message": "Employee created successfully"
}

All write requests will be served by masterdb. At the same time, the same data will be written to replicadb synchronously. This is done to make sure that both databases are consistent in terms of data.

All read requests will be served by replicadb.

What if replicadb is down?

When replicadb is down, we will make sure that read requests are served through masterdb.

💡
Suppose replicadb is down and because we are updating replicadb synchronously while making write requests to masterdb. So, this may insert the data to masterdb and this will introduce inconsistencies. Therefore, we will roll back the overall transaction making sure that the data is not written to both databases.
Note: We can update the replica asynchronously also but for simplicity, we are doing it synchronously.

We hope now you have an idea of what we are gonna build in this chapter.

Code Walkthrough

Let's first write employee.go. We have defined an employee model which we will use to perform CRUD operations on DB. For simplicity, we will have only GET and POST API.

package model

type Employee struct {
	ID     int    `json:"id"`
	Name   string `json:"name"`
	Salary int    `json:"salary"`
}

Now, let's look at the database.go which takes care of initializing the database.

package database

import (
	"database/sql"
	"fmt"
	"time"
)

func InitDB(dsn string) (*sql.DB, error) {
	db, err := sql.Open("postgres", dsn)
	if err != nil {
		return nil, err
	}

	// Set connection pool settings
	db.SetMaxOpenConns(10)
	db.SetMaxIdleConns(5)
	db.SetConnMaxLifetime(30 * time.Minute)

	return db, nil
}

func PingDB(db *sql.DB) error {
	if err := db.Ping(); err != nil {
		return fmt.Errorf("ping failed: %v", err)
	}
	return nil
}

Now, let's look at cron.go. This has a cron job that will keep checking whether the replicadb is down or not. Honestly writing, in this example, we don't need this cron job but in production, we definitely can set up some sort of metrics/alerts so that the issue is reported and can be fixed immediately.

package util

import (
	"data-replication/database"
	"database/sql"
	"log"
	"time"
)

func CheckReplicaAvailability(replicaDB *sql.DB) {
	ticker := time.NewTicker(20 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			if err := database.PingDB(replicaDB); err != nil {
				log.Println("Replica is down.")
			}
		}
	}
}

Now, let's look at our data-replication.go. This is one of the important files. It takes care of reading and writing data to the databases.

package service

import (
	"data-replication/model"
	"database/sql"
	"github.com/gin-gonic/gin"
	"log"
	"net/http"
)

type DataReplication struct {
	MasterDB  *sql.DB
	ReplicaDB *sql.DB
}

func InitDataReplicationService(masterDB *sql.DB, replicaDB *sql.DB) *DataReplication {
	return &DataReplication{
		MasterDB:  masterDB,
		ReplicaDB: replicaDB,
	}
}

func (dataReplicationService *DataReplication) CreateEmployee(c *gin.Context) {
	// Begin a transaction on the master database
	txMaster, err := dataReplicationService.MasterDB.Begin()
	if err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}
	defer func() {
		// Rollback the transaction if there is an error
		if err := recover(); err != nil {
			txMaster.Rollback()
		}
	}()

	// Begin a transaction on the replica database
	txReplica, err := dataReplicationService.ReplicaDB.Begin()
	if err != nil {
		txMaster.Rollback()
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}
	defer func() {
		// Rollback the transactions if there is an error
		if err := recover(); err != nil {
			txMaster.Rollback()
			txReplica.Rollback()
		}
	}()

	var employee model.Employee
	if err := c.BindJSON(&employee); err != nil {
		txMaster.Rollback()
		txReplica.Rollback()
		c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}

	// Write to master database within the transaction
	resultMaster, err := txMaster.Exec("INSERT INTO emp (id, name, salary) VALUES ($1, $2, $3) RETURNING id", employee.ID, employee.Name, employee.Salary)
	if err != nil {
		txMaster.Rollback()
		txReplica.Rollback()
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}

	_, err = resultMaster.RowsAffected()
	if err != nil {
		txMaster.Rollback()
		txReplica.Rollback()
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}

	// Write to replica database within the transaction
	if _, err := txReplica.Exec("INSERT INTO emp (id, name, salary) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET name = $2, salary = $3", employee.ID, employee.Name, employee.Salary); err != nil {
		txMaster.Rollback()
		txReplica.Rollback()
		log.Println("Failed to sync data to replica:", err)
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}

	// Commit the transactions only if both succeed
	if err := txMaster.Commit(); err != nil {
		txReplica.Rollback()
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}

	if err := txReplica.Commit(); err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}

	// Return the inserted ID in the response
	c.JSON(http.StatusOK, gin.H{"id": employee.ID, "message": "Employee created successfully"})
}

func (dataReplicationService *DataReplication) GetEmployees(c *gin.Context) {
	rows, err := dataReplicationService.ReplicaDB.Query("SELECT id, name, salary FROM emp")
	if err != nil {
		// If the replica is not available, switch to the master database
		log.Println("Replica is not available. Reading from master.")
		rows, err = dataReplicationService.MasterDB.Query("SELECT id, name, salary FROM emp")
		if err != nil {
			c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
			return
		}
		defer rows.Close()
	}

	var employees []model.Employee
	for rows.Next() {
		var employee model.Employee
		if err := rows.Scan(&employee.ID, &employee.Name, &employee.Salary); err != nil {
			c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
			return
		}
		employees = append(employees, employee)
	}

	c.JSON(http.StatusOK, employees)
}

Let's finally write main.go.

package main

import (
	"data-replication/database"
	"data-replication/service"
	"data-replication/util"
	"database/sql"
	"github.com/gin-gonic/gin"
	_ "github.com/lib/pq"
	"log"
)

const (
	masterDSN  = "user=postgres password=Master123 host=localhost port=5445 dbname=employee sslmode=disable"
	replicaDSN = "user=postgres password=Replica123 host=localhost port=5446 dbname=employee sslmode=disable"
)

var (
	masterDB  *sql.DB
	replicaDB *sql.DB
)

func main() {
	var err error

	// Initialize master and replica databases
	masterDB, err = database.InitDB(masterDSN)
	if err != nil {
		log.Fatal(err)
	}
	defer masterDB.Close()

	replicaDB, err = database.InitDB(replicaDSN)
	if err != nil {
		log.Fatal(err)
	}
	defer replicaDB.Close()

	dataReplicationService := service.InitDataReplicationService(masterDB, replicaDB)
	// Start a goroutine to periodically check replica availability
	go util.CheckReplicaAvailability(replicaDB)

	// Create Gin router
	router := gin.Default()

	// Define API routes
	router.GET("/employees", dataReplicationService.GetEmployees)
	router.POST("/employees", dataReplicationService.CreateEmployee)

	// Run the server
	if err := router.Run(":8080"); err != nil {
		log.Fatal(err)
	}
}

You can try stopping replicadb docker container using Docker Desktop or Docker CLI as shown below and then try making GET and POST API calls.

You can find the entire code on GitHub. Connect with the Author on Discord in case you are stuck or have any questions while implementing it.

We hope you have learned something great today.