| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639 |
- package main
- import (
- "bytes"
- "crypto/tls"
- "crypto/x509"
- "encoding/json"
- "encoding/pem"
- "flag"
- "fmt"
- "github.com/coreos/etcd/store"
- "github.com/coreos/etcd/web"
- "github.com/coreos/go-raft"
- "io/ioutil"
- "net"
- "net/http"
- "net/url"
- "os"
- "os/signal"
- "runtime/pprof"
- "strings"
- "time"
- )
- //------------------------------------------------------------------------------
- //
- // Initialization
- //
- //------------------------------------------------------------------------------
- var verbose bool
- var veryVerbose bool
- var machines string
- var machinesFile string
- var cluster []string
- var argInfo Info
- var dirPath string
- var force bool
- var maxSize int
- var snapshot bool
- var retryTimes int
- var maxClusterSize int
- var cpuprofile string
- func init() {
- flag.BoolVar(&verbose, "v", false, "verbose logging")
- flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
- flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
- flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
- flag.StringVar(&argInfo.Name, "n", "default-name", "the node name (required)")
- flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the hostname:port for etcd client communication")
- flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the hostname:port for raft server communication")
- flag.StringVar(&argInfo.WebURL, "w", "", "the hostname:port of web interface")
- flag.StringVar(&argInfo.RaftTLS.CAFile, "serverCAFile", "", "the path of the CAFile")
- flag.StringVar(&argInfo.RaftTLS.CertFile, "serverCert", "", "the cert file of the server")
- flag.StringVar(&argInfo.RaftTLS.KeyFile, "serverKey", "", "the key file of the server")
- flag.StringVar(&argInfo.EtcdTLS.CAFile, "clientCAFile", "", "the path of the client CAFile")
- flag.StringVar(&argInfo.EtcdTLS.CertFile, "clientCert", "", "the cert file of the client")
- flag.StringVar(&argInfo.EtcdTLS.KeyFile, "clientKey", "", "the key file of the client")
- flag.StringVar(&dirPath, "d", ".", "the directory to store log and snapshot")
- flag.BoolVar(&force, "f", false, "force new node configuration if existing is found (WARNING: data loss!)")
- flag.BoolVar(&snapshot, "snapshot", false, "open or close snapshot")
- flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
- flag.IntVar(&retryTimes, "r", 3, "the max retry attempts when trying to join a cluster")
- flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster")
- flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file")
- }
- const (
- ELECTIONTIMEOUT = 200 * time.Millisecond
- HEARTBEATTIMEOUT = 50 * time.Millisecond
- // Timeout for internal raft http connection
- // The original timeout for http is 45 seconds
- // which is too long for our usage.
- HTTPTIMEOUT = 10 * time.Second
- RETRYINTERVAL = 10
- )
- //------------------------------------------------------------------------------
- //
- // Typedefs
- //
- //------------------------------------------------------------------------------
- type TLSInfo struct {
- CertFile string `json:"CertFile"`
- KeyFile string `json:"KeyFile"`
- CAFile string `json:"CAFile"`
- }
- type Info struct {
- Name string `json:"name"`
- RaftURL string `json:"raftURL"`
- EtcdURL string `json:"etcdURL"`
- WebURL string `json:"webURL"`
- RaftTLS TLSInfo `json:"raftTLS"`
- EtcdTLS TLSInfo `json:"etcdTLS"`
- }
- //------------------------------------------------------------------------------
- //
- // Variables
- //
- //------------------------------------------------------------------------------
- var raftServer *raft.Server
- var raftTransporter transporter
- var etcdStore *store.Store
- var info *Info
- //------------------------------------------------------------------------------
- //
- // Functions
- //
- //------------------------------------------------------------------------------
- // sanitizeURL will cleanup a host string in the format hostname:port and
- // attach a schema.
- func sanitizeURL(host string, defaultScheme string) string {
- // Blank URLs are fine input, just return it
- if len(host) == 0 {
- return host
- }
- p, err := url.Parse(host)
- if err != nil {
- fatal(err)
- }
- // Make sure the host is in Host:Port format
- _, _, err = net.SplitHostPort(host)
- if err != nil {
- fatal(err)
- }
- p = &url.URL{Host: host, Scheme: defaultScheme}
- return p.String()
- }
- //--------------------------------------
- // Main
- //--------------------------------------
- func main() {
- flag.Parse()
- if cpuprofile != "" {
- f, err := os.Create(cpuprofile)
- if err != nil {
- fatal(err)
- }
- pprof.StartCPUProfile(f)
- defer pprof.StopCPUProfile()
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt)
- go func() {
- for sig := range c {
- fmt.Printf("captured %v, stopping profiler and exiting..", sig)
- pprof.StopCPUProfile()
- os.Exit(1)
- }
- }()
- }
- if veryVerbose {
- verbose = true
- raft.SetLogLevel(raft.Debug)
- }
- if machines != "" {
- cluster = strings.Split(machines, ",")
- } else if machinesFile != "" {
- b, err := ioutil.ReadFile(machinesFile)
- if err != nil {
- fatalf("Unable to read the given machines file: %s", err)
- }
- cluster = strings.Split(string(b), ",")
- }
- raftTLSConfig, ok := tlsConfigFromInfo(argInfo.RaftTLS)
- if !ok {
- fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
- }
- etcdTLSConfig, ok := tlsConfigFromInfo(argInfo.EtcdTLS)
- if !ok {
- fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
- }
- argInfo.Name = strings.TrimSpace(argInfo.Name)
- if argInfo.Name == "" {
- fatal("ERROR: server name required. e.g. '-n=server_name'")
- }
- argInfo.RaftURL = sanitizeURL(argInfo.RaftURL, raftTLSConfig.Scheme)
- argInfo.EtcdURL = sanitizeURL(argInfo.EtcdURL, etcdTLSConfig.Scheme)
- argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
- // Setup commands.
- registerCommands()
- // Read server info from file or grab it from user.
- if err := os.MkdirAll(dirPath, 0744); err != nil {
- fatalf("Unable to create path: %s", err)
- }
- info = getInfo(dirPath)
- // Create etcd key-value store
- etcdStore = store.CreateStore(maxSize)
- startRaft(raftTLSConfig)
- if argInfo.WebURL != "" {
- // start web
- argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
- go webHelper()
- go web.Start(raftServer, argInfo.WebURL)
- }
- startEtcdTransport(*info, etcdTLSConfig.Scheme, etcdTLSConfig.Server)
- }
- // Start the raft server
- func startRaft(tlsConfig TLSConfig) {
- var err error
- raftName := info.Name
- // Create transporter for raft
- raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
- // Create raft server
- raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
- if err != nil {
- fatal(err)
- }
- // LoadSnapshot
- if snapshot {
- err = raftServer.LoadSnapshot()
- if err == nil {
- debugf("%s finished load snapshot", raftServer.Name())
- } else {
- debug(err)
- }
- }
- raftServer.SetElectionTimeout(ELECTIONTIMEOUT)
- raftServer.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
- raftServer.Start()
- if raftServer.IsLogEmpty() {
- // start as a leader in a new cluster
- if len(cluster) == 0 {
- time.Sleep(time.Millisecond * 20)
- // leader need to join self as a peer
- for {
- command := &JoinCommand{
- Name: raftServer.Name(),
- RaftURL: argInfo.RaftURL,
- EtcdURL: argInfo.EtcdURL,
- }
- _, err := raftServer.Do(command)
- if err == nil {
- break
- }
- }
- debugf("%s start as a leader", raftServer.Name())
- // start as a follower in a existing cluster
- } else {
- time.Sleep(time.Millisecond * 20)
- for i := 0; i < retryTimes; i++ {
- success := false
- for _, machine := range cluster {
- if len(machine) == 0 {
- continue
- }
- err = joinCluster(raftServer, machine)
- if err != nil {
- if err.Error() == errors[103] {
- fmt.Println(err)
- os.Exit(1)
- }
- debugf("cannot join to cluster via machine %s %s", machine, err)
- } else {
- success = true
- break
- }
- }
- if success {
- break
- }
- warnf("cannot join to cluster via given machines, retry in %d seconds", RETRYINTERVAL)
- time.Sleep(time.Second * RETRYINTERVAL)
- }
- if err != nil {
- fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
- }
- debugf("%s success join to the cluster", raftServer.Name())
- }
- } else {
- // rejoin the previous cluster
- debugf("%s restart as a follower", raftServer.Name())
- }
- // open the snapshot
- if snapshot {
- go raftServer.Snapshot()
- }
- // start to response to raft requests
- go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
- }
- // Create transporter using by raft server
- // Create http or https transporter based on
- // whether the user give the server cert and key
- func newTransporter(scheme string, tlsConf tls.Config) transporter {
- t := transporter{}
- t.scheme = scheme
- tr := &http.Transport{
- Dial: dialTimeout,
- }
- if scheme == "https" {
- tr.TLSClientConfig = &tlsConf
- tr.DisableCompression = true
- }
- t.client = &http.Client{Transport: tr}
- return t
- }
- // Dial with timeout
- func dialTimeout(network, addr string) (net.Conn, error) {
- return net.DialTimeout(network, addr, HTTPTIMEOUT)
- }
- // Start to listen and response raft command
- func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
- u, _ := url.Parse(info.RaftURL)
- fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
- raftMux := http.NewServeMux()
- server := &http.Server{
- Handler: raftMux,
- TLSConfig: &tlsConf,
- Addr: u.Host,
- }
- // internal commands
- raftMux.HandleFunc("/name", NameHttpHandler)
- raftMux.HandleFunc("/join", JoinHttpHandler)
- raftMux.HandleFunc("/vote", VoteHttpHandler)
- raftMux.HandleFunc("/log", GetLogHttpHandler)
- raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
- raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
- raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
- raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
- if scheme == "http" {
- fatal(server.ListenAndServe())
- } else {
- fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
- }
- }
- // Start to listen and response client command
- func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
- u, _ := url.Parse(info.EtcdURL)
- fmt.Printf("etcd server [%s] listening on %s\n", info.Name, u)
- etcdMux := http.NewServeMux()
- server := &http.Server{
- Handler: etcdMux,
- TLSConfig: &tlsConf,
- Addr: u.Host,
- }
- // external commands
- etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer)
- etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
- etcdMux.HandleFunc("/leader", LeaderHttpHandler)
- etcdMux.HandleFunc("/machines", MachinesHttpHandler)
- etcdMux.HandleFunc("/", VersionHttpHandler)
- etcdMux.HandleFunc("/stats", StatsHttpHandler)
- etcdMux.HandleFunc("/test/", TestHttpHandler)
- if scheme == "http" {
- fatal(server.ListenAndServe())
- } else {
- fatal(server.ListenAndServeTLS(info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile))
- }
- }
- //--------------------------------------
- // Config
- //--------------------------------------
- type TLSConfig struct {
- Scheme string
- Server tls.Config
- Client tls.Config
- }
- func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
- var keyFile, certFile, CAFile string
- var tlsCert tls.Certificate
- var err error
- t.Scheme = "http"
- keyFile = info.KeyFile
- certFile = info.CertFile
- CAFile = info.CAFile
- // If the user do not specify key file, cert file and
- // CA file, the type will be HTTP
- if keyFile == "" && certFile == "" && CAFile == "" {
- return t, true
- }
- // both the key and cert must be present
- if keyFile == "" || certFile == "" {
- return t, false
- }
- tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
- if err != nil {
- fatal(err)
- }
- t.Scheme = "https"
- t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile)
- // The client should trust the RootCA that the Server uses since
- // everyone is a peer in the network.
- t.Client.Certificates = []tls.Certificate{tlsCert}
- t.Client.RootCAs = t.Server.ClientCAs
- return t, true
- }
- func parseInfo(path string) *Info {
- file, err := os.Open(path)
- if err != nil {
- return nil
- }
- info := &Info{}
- defer file.Close()
- content, err := ioutil.ReadAll(file)
- if err != nil {
- fatalf("Unable to read info: %v", err)
- return nil
- }
- if err = json.Unmarshal(content, &info); err != nil {
- fatalf("Unable to parse info: %v", err)
- return nil
- }
- return info
- }
- // Get the server info from previous conf file
- // or from the user
- func getInfo(path string) *Info {
- // Read in the server info if available.
- infoPath := fmt.Sprintf("%s/info", path)
- // Delete the old configuration if exist
- if force {
- logPath := fmt.Sprintf("%s/log", path)
- confPath := fmt.Sprintf("%s/conf", path)
- snapshotPath := fmt.Sprintf("%s/snapshot", path)
- os.Remove(infoPath)
- os.Remove(logPath)
- os.Remove(confPath)
- os.RemoveAll(snapshotPath)
- }
- info := parseInfo(infoPath)
- if info != nil {
- fmt.Printf("Found node configuration in '%s'. Ignoring flags.\n", infoPath)
- return info
- }
- info = &argInfo
- // Write to file.
- content, _ := json.MarshalIndent(info, "", " ")
- content = []byte(string(content) + "\n")
- if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
- fatalf("Unable to write info to file: %v", err)
- }
- fmt.Printf("Wrote node configuration to '%s'.\n", infoPath)
- return info
- }
- // Create client auth certpool
- func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
- if CAFile == "" {
- return tls.NoClientCert, nil
- }
- pemByte, _ := ioutil.ReadFile(CAFile)
- block, pemByte := pem.Decode(pemByte)
- cert, err := x509.ParseCertificate(block.Bytes)
- if err != nil {
- fatal(err)
- }
- certPool := x509.NewCertPool()
- certPool.AddCert(cert)
- return tls.RequireAndVerifyClientCert, certPool
- }
- // Send join requests to the leader.
- func joinCluster(s *raft.Server, raftURL string) error {
- var b bytes.Buffer
- command := &JoinCommand{
- Name: s.Name(),
- RaftURL: info.RaftURL,
- EtcdURL: info.EtcdURL,
- }
- json.NewEncoder(&b).Encode(command)
- // t must be ok
- t, ok := raftServer.Transporter().(transporter)
- if !ok {
- panic("wrong type")
- }
- joinURL := url.URL{Host: raftURL, Scheme: raftTransporter.scheme, Path: "/join"}
- debugf("Send Join Request to %s", raftURL)
-
- resp, err := t.Post(joinURL.String(), &b)
- for {
- if err != nil {
- return fmt.Errorf("Unable to join: %v", err)
- }
- if resp != nil {
- defer resp.Body.Close()
- if resp.StatusCode == http.StatusOK {
- return nil
- }
- if resp.StatusCode == http.StatusTemporaryRedirect {
- address := resp.Header.Get("Location")
- debugf("Send Join Request to %s", address)
- json.NewEncoder(&b).Encode(command)
- resp, err = t.Post(address, &b)
- } else if resp.StatusCode == http.StatusBadRequest {
- debug("Reach max number machines in the cluster")
- return fmt.Errorf(errors[103])
- } else {
- return fmt.Errorf("Unable to join")
- }
- }
- }
- return fmt.Errorf("Unable to join: %v", err)
- }
- // Register commands to raft server
- func registerCommands() {
- raft.RegisterCommand(&JoinCommand{})
- raft.RegisterCommand(&SetCommand{})
- raft.RegisterCommand(&GetCommand{})
- raft.RegisterCommand(&DeleteCommand{})
- raft.RegisterCommand(&WatchCommand{})
- raft.RegisterCommand(&TestAndSetCommand{})
- }
|