| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- package main
- import (
- "bytes"
- "crypto/tls"
- "crypto/x509"
- "encoding/json"
- "encoding/pem"
- "flag"
- "fmt"
- "github.com/coreos/etcd/store"
- "github.com/coreos/etcd/web"
- "github.com/coreos/go-raft"
- "io/ioutil"
- "log"
- "net/http"
- "os"
- "strings"
- "time"
- )
- //------------------------------------------------------------------------------
- //
- // Initialization
- //
- //------------------------------------------------------------------------------
- var verbose bool
- var machines string
- var cluster []string
- var address string
- var clientPort int
- var serverPort int
- var webPort int
- var serverCertFile string
- var serverKeyFile string
- var serverCAFile string
- var clientCertFile string
- var clientKeyFile string
- var clientCAFile string
- var dirPath string
- var ignore bool
- var maxSize int
- func init() {
- flag.BoolVar(&verbose, "v", false, "verbose logging")
- flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in cluster, sepearate by comma")
- flag.StringVar(&address, "a", "0.0.0.0", "the ip address of the local machine")
- flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients")
- flag.IntVar(&serverPort, "s", 7001, "the port to communicate with servers")
- flag.IntVar(&webPort, "w", -1, "the port of web interface")
- flag.StringVar(&serverCAFile, "serverCAFile", "", "the path of the CAFile")
- flag.StringVar(&serverCertFile, "serverCert", "", "the cert file of the server")
- flag.StringVar(&serverKeyFile, "serverKey", "", "the key file of the server")
- flag.StringVar(&clientCAFile, "clientCAFile", "", "the path of the client CAFile")
- flag.StringVar(&clientCertFile, "clientCert", "", "the cert file of the client")
- flag.StringVar(&clientKeyFile, "clientKey", "", "the key file of the client")
- flag.StringVar(&dirPath, "d", "/tmp/", "the directory to store log and snapshot")
- flag.BoolVar(&ignore, "i", false, "ignore the old configuration, create a new node")
- flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
- }
- // CONSTANTS
- const (
- HTTP = iota
- HTTPS
- HTTPSANDVERIFY
- )
- const (
- SERVER = iota
- CLIENT
- )
- const (
- ELECTIONTIMTOUT = 200 * time.Millisecond
- HEARTBEATTIMEOUT = 50 * time.Millisecond
- )
- //------------------------------------------------------------------------------
- //
- // Typedefs
- //
- //------------------------------------------------------------------------------
- type Info struct {
- Address string `json:"address"`
- ServerPort int `json:"serverPort"`
- ClientPort int `json:"clientPort"`
- WebPort int `json:"webPort"`
- ServerCertFile string `json:"serverCertFile"`
- ServerKeyFile string `json:"serverKeyFile"`
- ServerCAFile string `json:"serverCAFile"`
- ClientCertFile string `json:"clientCertFile"`
- ClientKeyFile string `json:"clientKeyFile"`
- ClientCAFile string `json:"clientCAFile"`
- }
- //------------------------------------------------------------------------------
- //
- // Variables
- //
- //------------------------------------------------------------------------------
- var raftServer *raft.Server
- var raftTransporter transporter
- var etcdStore *store.Store
- var info *Info
- //------------------------------------------------------------------------------
- //
- // Functions
- //
- //------------------------------------------------------------------------------
- //--------------------------------------
- // Main
- //--------------------------------------
- func main() {
- flag.Parse()
- cluster = strings.Split(machines, ",")
- // Setup commands.
- registerCommands()
- // Read server info from file or grab it from user.
- if err := os.MkdirAll(dirPath, 0744); err != nil {
- fatal("Unable to create path: %v", err)
- }
- info = getInfo(dirPath)
- // secrity type
- st := securityType(SERVER)
- clientSt := securityType(CLIENT)
- if st == -1 || clientSt == -1 {
- fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
- }
- // Create etcd key-value store
- etcdStore = store.CreateStore(maxSize)
- startRaft(st)
- if webPort != -1 {
- // start web
- etcdStore.SetMessager(&storeMsg)
- go webHelper()
- go web.Start(raftServer, webPort)
- }
- startClientTransport(info.ClientPort, clientSt)
- }
- // Start the raft server
- func startRaft(securityType int) {
- var err error
- raftName := fmt.Sprintf("%s:%d", info.Address, info.ServerPort)
- // Create transporter for raft
- raftTransporter = createTransporter(securityType)
- // Create raft server
- raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- // LoadSnapshot
- // err = raftServer.LoadSnapshot()
- // if err == nil {
- // debug("%s finished load snapshot", raftServer.Name())
- // } else {
- // debug(err)
- // }
- raftServer.Initialize()
- raftServer.SetElectionTimeout(ELECTIONTIMTOUT)
- raftServer.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
- if raftServer.IsLogEmpty() {
- // start as a leader in a new cluster
- if len(cluster) == 0 {
- raftServer.StartLeader()
- time.Sleep(time.Millisecond * 20)
- // leader need to join self as a peer
- for {
- command := &JoinCommand{}
- command.Name = raftServer.Name()
- _, err := raftServer.Do(command)
- if err == nil {
- break
- }
- }
- debug("%s start as a leader", raftServer.Name())
- // start as a follower in a existing cluster
- } else {
- raftServer.StartFollower()
- for _, machine := range cluster {
- err := joinCluster(raftServer, machine)
- if err != nil {
- debug("cannot join to cluster via machine %s", machine)
- } else {
- break
- }
- }
- if err != nil {
- fatal("cannot join to cluster via all given machines!")
- }
- debug("%s success join to the cluster", raftServer.Name())
- }
- } else {
- // rejoin the previous cluster
- raftServer.StartFollower()
- debug("%s restart as a follower", raftServer.Name())
- }
- // open the snapshot
- // go server.Snapshot()
- // start to response to raft requests
- go startRaftTransport(info.ServerPort, securityType)
- }
- // Create transporter using by raft server
- // Create http or https transporter based on
- // wether the user give the server cert and key
- func createTransporter(st int) transporter {
- t := transporter{}
- switch st {
- case HTTP:
- t.client = nil
- return t
- case HTTPS:
- fallthrough
- case HTTPSANDVERIFY:
- tlsCert, err := tls.LoadX509KeyPair(serverCertFile, serverKeyFile)
- if err != nil {
- fatal(fmt.Sprintln(err))
- }
- tr := &http.Transport{
- TLSClientConfig: &tls.Config{
- Certificates: []tls.Certificate{tlsCert},
- InsecureSkipVerify: true,
- },
- DisableCompression: true,
- }
- t.client = &http.Client{Transport: tr}
- return t
- }
- // for complier
- return transporter{}
- }
- // Start to listen and response raft command
- func startRaftTransport(port int, st int) {
- // internal commands
- http.HandleFunc("/join", JoinHttpHandler)
- http.HandleFunc("/vote", VoteHttpHandler)
- http.HandleFunc("/log", GetLogHttpHandler)
- http.HandleFunc("/log/append", AppendEntriesHttpHandler)
- http.HandleFunc("/snapshot", SnapshotHttpHandler)
- http.HandleFunc("/client", ClientHttpHandler)
- switch st {
- case HTTP:
- fmt.Printf("raft server [%s] listen on http port %v\n", address, port)
- log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
- case HTTPS:
- fmt.Printf("raft server [%s] listen on https port %v\n", address, port)
- log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
- case HTTPSANDVERIFY:
- server := &http.Server{
- TLSConfig: &tls.Config{
- ClientAuth: tls.RequireAndVerifyClientCert,
- ClientCAs: createCertPool(serverCAFile),
- },
- Addr: fmt.Sprintf(":%d", port),
- }
- fmt.Printf("raft server [%s] listen on https port %v\n", address, port)
- err := server.ListenAndServeTLS(serverCertFile, serverKeyFile)
- if err != nil {
- log.Fatal(err)
- }
- }
- }
- // Start to listen and response client command
- func startClientTransport(port int, st int) {
- // external commands
- http.HandleFunc("/"+version+"/keys/", Multiplexer)
- http.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
- http.HandleFunc("/"+version+"/list/", ListHttpHandler)
- http.HandleFunc("/"+version+"/testAndSet/", TestAndSetHttpHandler)
- http.HandleFunc("/leader", LeaderHttpHandler)
- switch st {
- case HTTP:
- fmt.Printf("etcd [%s] listen on http port %v\n", address, clientPort)
- log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
- case HTTPS:
- fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort)
- http.ListenAndServeTLS(fmt.Sprintf(":%d", port), clientCertFile, clientKeyFile, nil)
- case HTTPSANDVERIFY:
- server := &http.Server{
- TLSConfig: &tls.Config{
- ClientAuth: tls.RequireAndVerifyClientCert,
- ClientCAs: createCertPool(clientCAFile),
- },
- Addr: fmt.Sprintf(":%d", port),
- }
- fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort)
- err := server.ListenAndServeTLS(clientCertFile, clientKeyFile)
- if err != nil {
- log.Fatal(err)
- os.Exit(1)
- }
- }
- }
- //--------------------------------------
- // Config
- //--------------------------------------
- // Get the security type
- func securityType(source int) int {
- var keyFile, certFile, CAFile string
- switch source {
- case SERVER:
- keyFile = info.ServerKeyFile
- certFile = info.ServerCertFile
- CAFile = info.ServerCAFile
- case CLIENT:
- keyFile = info.ClientKeyFile
- certFile = info.ClientCertFile
- CAFile = info.ClientCAFile
- }
- // If the user do not specify key file, cert file and
- // CA file, the type will be HTTP
- if keyFile == "" && certFile == "" && CAFile == "" {
- return HTTP
- }
- if keyFile != "" && certFile != "" {
- if CAFile != "" {
- // If the user specify all the three file, the type
- // will be HTTPS with client cert auth
- return HTTPSANDVERIFY
- }
- // If the user specify key file and cert file but not
- // CA file, the type will be HTTPS without client cert
- // auth
- return HTTPS
- }
- // bad specification
- return -1
- }
- // Get the server info from previous conf file
- // or from the user
- func getInfo(path string) *Info {
- info := &Info{}
- // Read in the server info if available.
- infoPath := fmt.Sprintf("%s/info", path)
- // Delete the old configuration if exist
- if ignore {
- logPath := fmt.Sprintf("%s/log", path)
- snapshotPath := fmt.Sprintf("%s/snapshotPath", path)
- os.Remove(infoPath)
- os.Remove(logPath)
- os.RemoveAll(snapshotPath)
- }
- if file, err := os.Open(infoPath); err == nil {
- if content, err := ioutil.ReadAll(file); err != nil {
- fatal("Unable to read info: %v", err)
- } else {
- if err = json.Unmarshal(content, &info); err != nil {
- fatal("Unable to parse info: %v", err)
- }
- }
- file.Close()
- } else {
- // Otherwise ask user for info and write it to file.
- if address == "" {
- fatal("Please give the address of the local machine")
- }
- info.Address = address
- info.Address = strings.TrimSpace(info.Address)
- fmt.Println("address ", info.Address)
- info.ServerPort = serverPort
- info.ClientPort = clientPort
- info.WebPort = webPort
- info.ClientCAFile = clientCAFile
- info.ClientCertFile = clientCertFile
- info.ClientKeyFile = clientKeyFile
- info.ServerCAFile = serverCAFile
- info.ServerKeyFile = serverKeyFile
- info.ServerCertFile = serverCertFile
- // Write to file.
- content, _ := json.Marshal(info)
- content = []byte(string(content) + "\n")
- if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
- fatal("Unable to write info to file: %v", err)
- }
- }
- return info
- }
- // Create client auth certpool
- func createCertPool(CAFile string) *x509.CertPool {
- pemByte, _ := ioutil.ReadFile(CAFile)
- block, pemByte := pem.Decode(pemByte)
- cert, err := x509.ParseCertificate(block.Bytes)
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- certPool := x509.NewCertPool()
- certPool.AddCert(cert)
- return certPool
- }
- // Send join requests to the leader.
- func joinCluster(s *raft.Server, serverName string) error {
- var b bytes.Buffer
- command := &JoinCommand{}
- command.Name = s.Name()
- json.NewEncoder(&b).Encode(command)
- // t must be ok
- t, ok := raftServer.Transporter().(transporter)
- if !ok {
- panic("wrong type")
- }
- debug("Send Join Request to %s", serverName)
- resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b)
- debug("Finish Join Request to %s", serverName)
- for {
- fmt.Println(err, resp)
- if err != nil {
- return fmt.Errorf("Unable to join: %v", err)
- }
- if resp != nil {
- defer resp.Body.Close()
- if resp.StatusCode == http.StatusOK {
- return nil
- }
- if resp.StatusCode == http.StatusTemporaryRedirect {
- fmt.Println("redirect")
- address = resp.Header.Get("Location")
- debug("Leader is %s", address)
- debug("Send Join Request to %s", address)
- json.NewEncoder(&b).Encode(command)
- resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
- }
- }
- }
- return fmt.Errorf("Unable to join: %v", err)
- }
- // Register commands to raft server
- func registerCommands() {
- raft.RegisterCommand(&JoinCommand{})
- raft.RegisterCommand(&SetCommand{})
- raft.RegisterCommand(&GetCommand{})
- raft.RegisterCommand(&DeleteCommand{})
- raft.RegisterCommand(&WatchCommand{})
- raft.RegisterCommand(&ListCommand{})
- raft.RegisterCommand(&TestAndSetCommand{})
- }
|