Implement Data Replication in Go - Part II
In this chapter, we will implement a Single Leader Data Replication technique in Golang.
Summary
In the last chapter, we set up two docker containers running a docker image of postgres
on different ports of your local machine.
In this chapter, we will implement the Data Replication technique in Golang
with one masterdb
and one replicadb
which we have already setup.
Prerequisites
- Go - You should have Go installed on your machine.
- Code Editor (VS Code or Goland)
- Postman
We will have the below directory structure to implement the whole scenario.
data-replication-go
├── database
│ ├── database.go
├── model
│ ├── employee.go
├── service
│ ├── data-replication.go
├── util
│ ├── cron.go
├── go.mod
│ ├── go.sum
├── main.go
├── .gitignore
An idea around what we are building
The idea is that we will be connecting our Go application with both databases i.e. masterdb
and replicadb
that we created in the previous chapter.
We will have two APIs.
GET
: this will take care of reading data from the replica database.POST
: this will take care of writing data to the master database.
GET Request
URL: http://localhost:8080/employees
GET Response
[
{
"id": 101,
"name": "John",
"salary": 285000
},
{
"id": 102,
"name": "Sam",
"salary": 255000
}
]
POST Request
URL: http://localhost:8080/employees
Request Body:
{
"id": 103,
"name": "John",
"salary": 135000
}
POST Response
{
"id": 103,
"message": "Employee created successfully"
}
All write requests will be served by masterdb
. At the same time, the same data will be written to replicadb
synchronously. This is done to make sure that both databases are consistent in terms of data.
All read requests will be served by replicadb
.
What if replicadb
is down?
When replicadb
is down, we will make sure that read requests are served through masterdb
.
Note: We can update the replica asynchronously also but for simplicity, we are doing it synchronously.
We hope now you have an idea of what we are gonna build in this chapter.
Code Walkthrough
Let's first write employee.go
. We have defined an employee model which we will use to perform CRUD operations on DB. For simplicity, we will have only GET
and POST
API.
package model
type Employee struct {
ID int `json:"id"`
Name string `json:"name"`
Salary int `json:"salary"`
}
Now, let's look at the database.go
which takes care of initializing the database.
package database
import (
"database/sql"
"fmt"
"time"
)
func InitDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
// Set connection pool settings
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(30 * time.Minute)
return db, nil
}
func PingDB(db *sql.DB) error {
if err := db.Ping(); err != nil {
return fmt.Errorf("ping failed: %v", err)
}
return nil
}
Now, let's look at cron.go
. This has a cron job that will keep checking whether the replicadb
is down or not. Honestly writing, in this example, we don't need this cron job but in production, we definitely can set up some sort of metrics/alerts so that the issue is reported and can be fixed immediately.
package util
import (
"data-replication/database"
"database/sql"
"log"
"time"
)
func CheckReplicaAvailability(replicaDB *sql.DB) {
ticker := time.NewTicker(20 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := database.PingDB(replicaDB); err != nil {
log.Println("Replica is down.")
}
}
}
}
Now, let's look at our data-replication.go
. This is one of the important files. It takes care of reading and writing data to the databases.
package service
import (
"data-replication/model"
"database/sql"
"github.com/gin-gonic/gin"
"log"
"net/http"
)
type DataReplication struct {
MasterDB *sql.DB
ReplicaDB *sql.DB
}
func InitDataReplicationService(masterDB *sql.DB, replicaDB *sql.DB) *DataReplication {
return &DataReplication{
MasterDB: masterDB,
ReplicaDB: replicaDB,
}
}
func (dataReplicationService *DataReplication) CreateEmployee(c *gin.Context) {
// Begin a transaction on the master database
txMaster, err := dataReplicationService.MasterDB.Begin()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
defer func() {
// Rollback the transaction if there is an error
if err := recover(); err != nil {
txMaster.Rollback()
}
}()
// Begin a transaction on the replica database
txReplica, err := dataReplicationService.ReplicaDB.Begin()
if err != nil {
txMaster.Rollback()
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
defer func() {
// Rollback the transactions if there is an error
if err := recover(); err != nil {
txMaster.Rollback()
txReplica.Rollback()
}
}()
var employee model.Employee
if err := c.BindJSON(&employee); err != nil {
txMaster.Rollback()
txReplica.Rollback()
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Write to master database within the transaction
resultMaster, err := txMaster.Exec("INSERT INTO emp (id, name, salary) VALUES ($1, $2, $3) RETURNING id", employee.ID, employee.Name, employee.Salary)
if err != nil {
txMaster.Rollback()
txReplica.Rollback()
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
_, err = resultMaster.RowsAffected()
if err != nil {
txMaster.Rollback()
txReplica.Rollback()
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
// Write to replica database within the transaction
if _, err := txReplica.Exec("INSERT INTO emp (id, name, salary) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET name = $2, salary = $3", employee.ID, employee.Name, employee.Salary); err != nil {
txMaster.Rollback()
txReplica.Rollback()
log.Println("Failed to sync data to replica:", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
// Commit the transactions only if both succeed
if err := txMaster.Commit(); err != nil {
txReplica.Rollback()
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
if err := txReplica.Commit(); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
// Return the inserted ID in the response
c.JSON(http.StatusOK, gin.H{"id": employee.ID, "message": "Employee created successfully"})
}
func (dataReplicationService *DataReplication) GetEmployees(c *gin.Context) {
rows, err := dataReplicationService.ReplicaDB.Query("SELECT id, name, salary FROM emp")
if err != nil {
// If the replica is not available, switch to the master database
log.Println("Replica is not available. Reading from master.")
rows, err = dataReplicationService.MasterDB.Query("SELECT id, name, salary FROM emp")
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
defer rows.Close()
}
var employees []model.Employee
for rows.Next() {
var employee model.Employee
if err := rows.Scan(&employee.ID, &employee.Name, &employee.Salary); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
employees = append(employees, employee)
}
c.JSON(http.StatusOK, employees)
}
Let's finally write main.go
.
package main
import (
"data-replication/database"
"data-replication/service"
"data-replication/util"
"database/sql"
"github.com/gin-gonic/gin"
_ "github.com/lib/pq"
"log"
)
const (
masterDSN = "user=postgres password=Master123 host=localhost port=5445 dbname=employee sslmode=disable"
replicaDSN = "user=postgres password=Replica123 host=localhost port=5446 dbname=employee sslmode=disable"
)
var (
masterDB *sql.DB
replicaDB *sql.DB
)
func main() {
var err error
// Initialize master and replica databases
masterDB, err = database.InitDB(masterDSN)
if err != nil {
log.Fatal(err)
}
defer masterDB.Close()
replicaDB, err = database.InitDB(replicaDSN)
if err != nil {
log.Fatal(err)
}
defer replicaDB.Close()
dataReplicationService := service.InitDataReplicationService(masterDB, replicaDB)
// Start a goroutine to periodically check replica availability
go util.CheckReplicaAvailability(replicaDB)
// Create Gin router
router := gin.Default()
// Define API routes
router.GET("/employees", dataReplicationService.GetEmployees)
router.POST("/employees", dataReplicationService.CreateEmployee)
// Run the server
if err := router.Run(":8080"); err != nil {
log.Fatal(err)
}
}
You can try stopping replicadb
docker container using Docker Desktop or Docker CLI as shown below and then try making GET and POST API calls.
You can find the entire code on GitHub. Connect with the Author on Discord in case you are stuck or have any questions while implementing it.
We hope you have learned something great today.