加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
arbitrator.go 7.34 KB
一键复制 编辑 原始数据 按行查看 历史
svaroqui 提交于 2019-04-14 12:57 . Refactoring code
// +build arbitrator
// replication-manager - Replication Manager Monitoring and CLI for MariaDB and MySQL
// Copyright 2017 Signal 18 SARL
// Author: Stephane Varoqui <svaroqui@gmail.com>
// License: GNU General Public License, version 3. Redistribution/Reuse of this code is permitted under the GNU v3 license, as an additional term ALL code must carry the original Author(s) credit in comment form.
// See LICENSE in this directory for the integral text.
package main
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"github.com/gorilla/mux"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
"github.com/signal18/replication-manager/cluster"
"github.com/signal18/replication-manager/server"
"github.com/signal18/replication-manager/utils/dbhelper"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
type route struct {
Name string
Method string
Pattern string
HandlerFunc http.HandlerFunc
}
type routes []route
func newRouter() *mux.Router {
router := mux.NewRouter().StrictSlash(true)
for _, r := range rs {
router.
Methods(r.Method).
Path(r.Pattern).
Name(r.Name).
Handler(r.HandlerFunc)
}
return router
}
var rs = routes{
route{
"Heartbeat",
"POST",
"/heartbeat",
handlerHeartbeat,
},
route{
"Arbitrator",
"POST",
"/arbitrator",
handlerArbitrator,
},
route{
"Forget",
"PST",
"/forget/",
handlerForget,
},
}
type response struct {
Arbitration string `json:"arbitration"`
ElectedMaster string `json:"master"`
}
var (
arbitratorCluster *cluster.Cluster
)
func init() {
cobra.OnInitialize()
rootCmd.AddCommand(arbitratorCmd)
arbitratorCmd.Flags().StringVar(&conf.ArbitratorAddress, "arbitrator-bind-address", "0.0.0.0:10001", "Arbitrator API port")
arbitratorCmd.Flags().StringVar(&conf.ArbitratorDriver, "arbitrator-driver", "sqlite", "sqlite|mysql, use a local sqllite or use a mysql backend")
}
var arbitratorCmd = &cobra.Command{
Use: "arbitrator",
Short: "Arbitrator environment",
Long: `The arbitrator is used for false positive detection`,
Run: func(cmd *cobra.Command, args []string) {
RepMan = new(server.ReplicationManager)
RepMan.InitConfig(conf)
if _, ok := RepMan.Confs["arbitrator"]; !ok {
log.Fatal("Could not find arbitrator configuration section")
}
if RepMan.Confs["arbitrator"].ArbitratorDriver == "mysql" {
arbitratorCluster = new(cluster.Cluster)
arbitratorCluster.InitAgent(RepMan.Confs["arbitrator"])
arbitratorCluster.SetLogStdout()
}
db, err := getArbitratorBackendStorageConnection()
if err != nil {
log.Fatal("Error opening arbitrator database: ", err)
}
err = db.Ping()
if err != nil {
log.Fatal(err)
}
err = dbhelper.SetHeartbeatTable(db)
if err != nil {
log.WithError(err).Error("Error creating tables")
}
router := newRouter()
log.Infof("Arbitrator listening on %s", RepMan.Confs["arbitrator"].ArbitratorAddress)
log.Fatal(http.ListenAndServe(RepMan.Confs["arbitrator"].ArbitratorAddress, router))
},
}
func getArbitratorBackendStorageConnection() (*sqlx.DB, error) {
var err error
var db *sqlx.DB
if RepMan.Confs["arbitrator"].ArbitratorDriver == "sqlite" {
db, err = dbhelper.SQLiteConnect(conf.WorkingDir)
}
if RepMan.Confs["arbitrator"].ArbitratorDriver == "mysql" {
db, err = dbhelper.MySQLConnect(arbitratorCluster.GetServers()[0].User, arbitratorCluster.GetServers()[0].Pass, arbitratorCluster.GetServers()[0].Host+":"+arbitratorCluster.GetServers()[0].Port, fmt.Sprintf("?timeout=%ds", RepMan.Confs["arbitrator"].Timeout))
}
return db, err
}
func handlerArbitrator(w http.ResponseWriter, r *http.Request) {
var h server.Heartbeat
body, err := ioutil.ReadAll(io.LimitReader(r.Body, 1048576))
if err != nil {
log.Errorln(err)
w.WriteHeader(500)
return
}
if err := r.Body.Close(); err != nil {
log.Errorln(err)
w.WriteHeader(500)
return
}
log.Info("Arbitration request received: ", string(body))
if err := json.Unmarshal(body, &h); err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(422) // unprocessable entity
if err = json.NewEncoder(w).Encode(err); err != nil {
log.Errorln(err)
w.WriteHeader(500)
}
}
var send response
db, err := getArbitratorBackendStorageConnection()
if err != nil {
arbitratorCluster.LogPrintf("ERROR", "Error opening arbitrator database: %s", err)
return
}
defer db.Close()
res := dbhelper.RequestArbitration(db, h.UUID, h.Secret, h.Cluster, h.Master, h.UID, h.Hosts, h.Failed)
electedmaster := dbhelper.GetArbitrationMaster(db, h.Secret, h.Cluster)
if res {
send.Arbitration = "winner"
send.ElectedMaster = electedmaster
} else {
send.Arbitration = "looser"
send.ElectedMaster = electedmaster
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusCreated)
if err := json.NewEncoder(w).Encode(send); err != nil {
panic(err)
}
}
func handlerHeartbeat(w http.ResponseWriter, r *http.Request) {
var h server.Heartbeat
body, err := ioutil.ReadAll(io.LimitReader(r.Body, 1048576))
if err != nil {
panic(err)
}
//log.Printf("INFO: Hearbeat receive:%s", string(body))
if err = r.Body.Close(); err != nil {
w.WriteHeader(500)
log.Errorln(err)
return
}
if err = json.Unmarshal(body, &h); err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(422) // unprocessable entity
if err = json.NewEncoder(w).Encode(err); err != nil {
w.WriteHeader(500)
log.Errorln(err)
return
}
return
}
var send string
db, err := getArbitratorBackendStorageConnection()
if err != nil {
arbitratorCluster.LogPrintf("ERROR", "Error opening arbitrator database: %s", err)
w.WriteHeader(500)
log.Errorln(err)
return
}
defer db.Close()
res := dbhelper.WriteHeartbeat(db, h.UUID, h.Secret, h.Cluster, h.Master, h.UID, h.Hosts, h.Failed)
if res == nil {
send = `{"heartbeat":"succed"}`
} else {
log.Error("Error writing heartbeat, reason: ", res)
send = `{"heartbeat":"failed"}`
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
if err := json.NewEncoder(w).Encode(send); err != nil {
w.WriteHeader(500)
log.Errorln(err)
return
}
}
func handlerForget(w http.ResponseWriter, r *http.Request) {
var h server.Heartbeat
body, err := ioutil.ReadAll(io.LimitReader(r.Body, 1048576))
if err != nil {
w.WriteHeader(500)
log.Errorln(err)
return
}
//log.Printf("INFO: Hearbeat receive:%s", string(body))
if err = r.Body.Close(); err != nil {
w.WriteHeader(500)
log.Errorln(err)
return
}
if err = json.Unmarshal(body, &h); err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(422) // unprocessable entity
if err = json.NewEncoder(w).Encode(err); err != nil {
w.WriteHeader(500)
log.Errorln(err)
return
}
return
}
// currentCluster := new(cluster.Cluster)
var send string
db, err := getArbitratorBackendStorageConnection()
if err != nil {
arbitratorCluster.LogPrintf("ERROR", "Error opening arbitrator database: %s", err)
w.WriteHeader(500)
log.Errorln(err)
return
}
defer db.Close()
res := dbhelper.ForgetArbitration(db, h.Secret)
if res == nil {
send = `{"heartbeat":"succed"}`
} else {
send = `{"heartbeat":"failed"}`
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
if err := json.NewEncoder(w).Encode(send); err != nil {
w.WriteHeader(500)
log.Errorln(err)
return
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化