From 844d33eeb8c16f0c5f27b4ab5336b42b55c6dddc Mon Sep 17 00:00:00 2001 From: Clement Li Date: Tue, 2 Nov 2021 12:41:44 +0800 Subject: [PATCH] update to rabbitmq --- src/server.go | 128 ++++++++++++++++++-------------------------------- 1 file changed, 45 insertions(+), 83 deletions(-) diff --git a/src/server.go b/src/server.go index 7da4640..4c32b53 100644 --- a/src/server.go +++ b/src/server.go @@ -1,99 +1,61 @@ package main import ( - "encoding/json" - "fmt" - gitee_utils "gitee.com/lizi/test-bot/src/gitee-utils" - "io/ioutil" - "net/http" + "log" "os" -) - + "time" -var token []byte -var repo []byte - -func getToken() []byte { - return token -} - -func ServeHTTP(w http.ResponseWriter, r *http.Request) { - fmt.Fprint(w, "Event.") - var ie gitee_utils.Issue - _, _, payload, ok, _ := gitee_utils.ValidateWebhook(w, r) - - if !ok { - return - } - if err := json.Unmarshal(payload, &ie); err != nil { - return - } - go eventHandler(ie) - -} + gitee_utils "gitee.com/lizi/test-bot/src/gitee-utils" +) -func eventHandler(i gitee_utils.Issue) { - var repoinfo gitee_utils.RepoInfo - err := json.Unmarshal(repo, &repoinfo) - if err != nil { - return +func doRabbitMQ() { + + RMQ_QUEUE_NAME := os.Getenv("RMQ_QUEUE_NAME") + RMQ_HOST := os.Getenv("RMQ_HOST") + RMQ_VHOST := os.Getenv("RMQ_VHOST") + RMQ_USER := os.Getenv("RMQ_USER") + RMQ_PASS := os.Getenv("RMQ_PASS") + RMQ_PORT := os.Getenv("RMQ_PORT") + RMQ_ROUTINGKEY := os.Getenv("RMQ_ROUTINGKEY") + RMQ_EXCHANGE_NAME := os.Getenv("RMQ_EXCHANGE_NAME") + RMQ_EXCHANGE_TYPE := os.Getenv("RMQ_EXCHANGE_TYPE") + + // RabbitMQ + rc := gitee_utils.RabbitConfig{ + Schema: "amqp", + Username: RMQ_USER, + Password: RMQ_PASS, + Host: RMQ_HOST, + Port: RMQ_PORT, + VHost: RMQ_VHOST, + ConnectionName: "", } - orgInfo := repoinfo.Org - repoInfo := repoinfo.Repo - issueID := i.IssueID - eventType := i.EventType - targetInfo := "请注意" - targetUser := i.TargetInfo.TargetUser - infoType := i.TargetInfo.InfoType - //targetLabel := i.TargetLabel - //targetAssigneeID := i.TargetAssigneeID - //pushTime := i.PushTime - c := gitee_utils.NewClient(getToken) - - switch eventType { - case "info" : - switch infoType { - case "issueComment" : - strInfo := targetInfo + " @"+ targetUser + " " - res := c.CreateGiteeIssueComment(orgInfo, repoInfo, issueID, strInfo) - fmt.Println(strInfo) - if res != nil { - fmt.Println(res.Error()) - return - } - } - default: - return + rbt := gitee_utils.NewRabbit(rc) + if err := rbt.Connect(); err != nil { + log.Fatalln("unable to connect to rabbit", err) } -} - -func loadFile(path, fileType string) error { - jsonFile, err := os.Open(path) - if err != nil { - fmt.Println(err) - defer jsonFile.Close() - return err + // + + // Consumer + cc := gitee_utils.ConsumerConfig{ + ExchangeName: RMQ_EXCHANGE_NAME, + ExchangeType: RMQ_EXCHANGE_TYPE, + RoutingKey: RMQ_ROUTINGKEY, + QueueName: RMQ_QUEUE_NAME, + ConsumerCount: 1, + PrefetchCount: 1, } - defer jsonFile.Close() - byteValue, _ := ioutil.ReadAll(jsonFile) - switch { - case fileType == "token" : - token = byteValue - case fileType == "repo" : - repo = byteValue - default: - fmt.Printf("no filetype\n" ) + cc.Reconnect.MaxAttempt = 60 + cc.Reconnect.Interval = 1 * time.Second + csm := gitee_utils.NewConsumer(cc, rbt) + if err := csm.Start(); err != nil { + log.Fatalln("unable to start consumer", err) } - return nil -} -func configFile() { - loadFile("src/data/token.md", "token") - loadFile("src/data/repo.json", "repo") + select {} } func main() { configFile() - http.HandleFunc("/api/Executor/execute-event/", ServeHTTP) - http.ListenAndServe(":8002", nil) + doRabbitMQ() } -- Gitee