123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- /*
- Copyright 2013 CoreOS Inc.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package etcd
- import (
- "net/http"
- "os"
- "path/filepath"
- "runtime"
- "strings"
- "sync"
- "time"
- goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
- golog "github.com/coreos/etcd/third_party/github.com/coreos/go-log/log"
- "github.com/coreos/etcd/third_party/github.com/goraft/raft"
- httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient"
- "github.com/coreos/etcd/config"
- ehttp "github.com/coreos/etcd/http"
- "github.com/coreos/etcd/log"
- "github.com/coreos/etcd/metrics"
- "github.com/coreos/etcd/server"
- "github.com/coreos/etcd/store"
- )
- // TODO(yichengq): constant extraTimeout is a hack.
- // Current problem is that there is big lag between join command
- // execution and join success.
- // Fix it later. It should be removed when proper method is found and
- // enough tests are provided. It is expected to be calculated from
- // heartbeatInterval and electionTimeout only.
- const extraTimeout = time.Duration(1000) * time.Millisecond
- type Etcd struct {
- Config *config.Config // etcd config
- Store store.Store // data store
- Registry *server.Registry // stores URL information for nodes
- Server *server.Server // http server, runs on 4001 by default
- PeerServer *server.PeerServer // peer server, runs on 7001 by default
- StandbyServer *server.StandbyServer
- server *http.Server
- peerServer *http.Server
- mode Mode
- modeMutex sync.Mutex
- closeChan chan bool
- readyNotify chan bool // To signal when server is ready to accept connections
- onceReady sync.Once
- stopNotify chan bool // To signal when server is stopped totally
- }
- // New returns a new Etcd instance.
- func New(c *config.Config) *Etcd {
- if c == nil {
- c = config.New()
- }
- return &Etcd{
- Config: c,
- closeChan: make(chan bool),
- readyNotify: make(chan bool),
- stopNotify: make(chan bool),
- }
- }
- // Run the etcd instance.
- func (e *Etcd) Run() {
- // Sanitize all the input fields.
- if err := e.Config.Sanitize(); err != nil {
- log.Fatalf("failed sanitizing configuration: %v", err)
- }
- // Force remove server configuration if specified.
- if e.Config.Force {
- e.Config.Reset()
- }
- // Enable options.
- if e.Config.VeryVeryVerbose {
- log.Verbose = true
- raft.SetLogLevel(raft.Trace)
- goetcd.SetLogger(
- golog.New(
- "go-etcd",
- false,
- golog.CombinedSink(
- os.Stdout,
- "[%s] %s %-9s | %s\n",
- []string{"prefix", "time", "priority", "message"},
- ),
- ),
- )
- } else if e.Config.VeryVerbose {
- log.Verbose = true
- raft.SetLogLevel(raft.Debug)
- } else if e.Config.Verbose {
- log.Verbose = true
- }
- if e.Config.CPUProfileFile != "" {
- profile(e.Config.CPUProfileFile)
- }
- if e.Config.DataDir == "" {
- log.Fatal("The data dir was not set and could not be guessed from machine name")
- }
- // Create data directory if it doesn't already exist.
- if err := os.MkdirAll(e.Config.DataDir, 0744); err != nil {
- log.Fatalf("Unable to create path: %s", err)
- }
- // Warn people if they have an info file
- info := filepath.Join(e.Config.DataDir, "info")
- if _, err := os.Stat(info); err == nil {
- log.Warnf("All cached configuration is now ignored. The file %s can be removed.", info)
- }
- var mbName string
- if e.Config.Trace() {
- mbName = e.Config.MetricsBucketName()
- runtime.SetBlockProfileRate(1)
- }
- mb := metrics.NewBucket(mbName)
- if e.Config.GraphiteHost != "" {
- err := mb.Publish(e.Config.GraphiteHost)
- if err != nil {
- panic(err)
- }
- }
- // Retrieve CORS configuration
- corsInfo, err := ehttp.NewCORSInfo(e.Config.CorsOrigins)
- if err != nil {
- log.Fatal("CORS:", err)
- }
- // Create etcd key-value store and registry.
- e.Store = store.New()
- e.Registry = server.NewRegistry(e.Store)
- // Create stats objects
- followersStats := server.NewRaftFollowersStats(e.Config.Name)
- serverStats := server.NewRaftServerStats(e.Config.Name)
- // Calculate all of our timeouts
- heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond
- electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond
- dialTimeout := (3 * heartbeatInterval) + electionTimeout
- responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout
- clientTransporter := &httpclient.Transport{
- ResponseHeaderTimeout: responseHeaderTimeout + extraTimeout,
- // This is a workaround for Transport.CancelRequest doesn't work on
- // HTTPS connections blocked. The patch for it is in progress,
- // and would be available in Go1.3
- // More: https://codereview.appspot.com/69280043/
- ConnectTimeout: dialTimeout + extraTimeout,
- RequestTimeout: responseHeaderTimeout + dialTimeout + 2*extraTimeout,
- }
- if e.Config.PeerTLSInfo().Scheme() == "https" {
- clientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig()
- if err != nil {
- log.Fatal("client TLS error: ", err)
- }
- clientTransporter.TLSClientConfig = clientTLSConfig
- clientTransporter.DisableCompression = true
- }
- client := server.NewClient(clientTransporter)
- // Create peer server
- psConfig := server.PeerServerConfig{
- Name: e.Config.Name,
- Scheme: e.Config.PeerTLSInfo().Scheme(),
- URL: e.Config.Peer.Addr,
- SnapshotCount: e.Config.SnapshotCount,
- RetryTimes: e.Config.MaxRetryAttempts,
- RetryInterval: e.Config.RetryInterval,
- }
- e.PeerServer = server.NewPeerServer(psConfig, client, e.Registry, e.Store, &mb, followersStats, serverStats)
- // Create raft transporter and server
- raftTransporter := server.NewTransporter(followersStats, serverStats, e.Registry, heartbeatInterval, dialTimeout, responseHeaderTimeout)
- if e.Config.PeerTLSInfo().Scheme() == "https" {
- raftClientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig()
- if err != nil {
- log.Fatal("raft client TLS error: ", err)
- }
- raftTransporter.SetTLSConfig(*raftClientTLSConfig)
- }
- raftServer, err := raft.NewServer(e.Config.Name, e.Config.DataDir, raftTransporter, e.Store, e.PeerServer, "")
- if err != nil {
- log.Fatal(err)
- }
- raftServer.SetElectionTimeout(electionTimeout)
- raftServer.SetHeartbeatInterval(heartbeatInterval)
- e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot)
- // Create etcd server
- e.Server = server.New(e.Config.Name, e.Config.Addr, e.PeerServer, e.Registry, e.Store, &mb)
- if e.Config.Trace() {
- e.Server.EnableTracing()
- }
- e.PeerServer.SetServer(e.Server)
- // Create standby server
- ssConfig := server.StandbyServerConfig{
- Name: e.Config.Name,
- PeerScheme: e.Config.PeerTLSInfo().Scheme(),
- PeerURL: e.Config.Peer.Addr,
- ClientURL: e.Config.Addr,
- DataDir: e.Config.DataDir,
- }
- e.StandbyServer = server.NewStandbyServer(ssConfig, client)
- e.StandbyServer.SetRaftServer(raftServer)
- // Generating config could be slow.
- // Put it here to make listen happen immediately after peer-server starting.
- peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo())
- etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo())
- if !e.StandbyServer.IsRunning() {
- startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers)
- if err != nil {
- log.Fatal(err)
- }
- if startPeerServer {
- e.setMode(PeerMode)
- } else {
- e.StandbyServer.SyncCluster(possiblePeers)
- e.setMode(StandbyMode)
- }
- } else {
- e.setMode(StandbyMode)
- }
- serverHTTPHandler := &ehttp.CORSHandler{e.Server.HTTPHandler(), corsInfo}
- peerServerHTTPHandler := &ehttp.CORSHandler{e.PeerServer.HTTPHandler(), corsInfo}
- standbyServerHTTPHandler := &ehttp.CORSHandler{e.StandbyServer.ClientHTTPHandler(), corsInfo}
- log.Infof("etcd server [name %s, listen on %s, advertised url %s]", e.Server.Name, e.Config.BindAddr, e.Server.URL())
- listener := server.NewListener(e.Config.EtcdTLSInfo().Scheme(), e.Config.BindAddr, etcdTLSConfig)
- e.server = &http.Server{Handler: &ModeHandler{e, serverHTTPHandler, standbyServerHTTPHandler},
- ReadTimeout: time.Duration(e.Config.HTTPReadTimeout) * time.Second,
- WriteTimeout: time.Duration(e.Config.HTTPWriteTimeout) * time.Second,
- }
- log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL)
- peerListener := server.NewListener(e.Config.PeerTLSInfo().Scheme(), e.Config.Peer.BindAddr, peerTLSConfig)
- e.peerServer = &http.Server{Handler: &ModeHandler{e, peerServerHTTPHandler, http.NotFoundHandler()},
- ReadTimeout: time.Duration(server.DefaultReadTimeout) * time.Second,
- WriteTimeout: time.Duration(server.DefaultWriteTimeout) * time.Second,
- }
- wg := sync.WaitGroup{}
- wg.Add(2)
- go func() {
- <-e.readyNotify
- defer wg.Done()
- if err := e.server.Serve(listener); err != nil {
- if !isListenerClosing(err) {
- log.Fatal(err)
- }
- }
- }()
- go func() {
- <-e.readyNotify
- defer wg.Done()
- if err := e.peerServer.Serve(peerListener); err != nil {
- if !isListenerClosing(err) {
- log.Fatal(err)
- }
- }
- }()
- e.runServer()
- listener.Close()
- peerListener.Close()
- wg.Wait()
- log.Infof("etcd instance is stopped [name %s]", e.Config.Name)
- close(e.stopNotify)
- }
- func (e *Etcd) runServer() {
- var removeNotify <-chan bool
- for {
- if e.mode == PeerMode {
- log.Infof("%v starting in peer mode", e.Config.Name)
- // Starting peer server should be followed close by listening on its port
- // If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
- // One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster,
- // the cluster could be out of work as long as the two nodes cannot transfer messages.
- e.PeerServer.Start(e.Config.Snapshot, e.Config.ClusterConfig())
- removeNotify = e.PeerServer.RemoveNotify()
- } else {
- log.Infof("%v starting in standby mode", e.Config.Name)
- e.StandbyServer.Start()
- removeNotify = e.StandbyServer.RemoveNotify()
- }
- // etcd server is ready to accept connections, notify waiters.
- e.onceReady.Do(func() { close(e.readyNotify) })
- select {
- case <-e.closeChan:
- e.PeerServer.Stop()
- e.StandbyServer.Stop()
- return
- case <-removeNotify:
- }
- if e.mode == PeerMode {
- peerURLs := e.Registry.PeerURLs(e.PeerServer.RaftServer().Leader(), e.Config.Name)
- e.StandbyServer.SyncCluster(peerURLs)
- e.setMode(StandbyMode)
- } else {
- // Create etcd key-value store and registry.
- e.Store = store.New()
- e.Registry = server.NewRegistry(e.Store)
- e.PeerServer.SetStore(e.Store)
- e.PeerServer.SetRegistry(e.Registry)
- e.Server.SetStore(e.Store)
- e.Server.SetRegistry(e.Registry)
- // Generate new peer server here.
- // TODO(yichengq): raft server cannot be started after stopped.
- // It should be removed when raft restart is implemented.
- heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond
- electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond
- raftServer, err := raft.NewServer(e.Config.Name, e.Config.DataDir, e.PeerServer.RaftServer().Transporter(), e.Store, e.PeerServer, "")
- if err != nil {
- log.Fatal(err)
- }
- raftServer.SetElectionTimeout(electionTimeout)
- raftServer.SetHeartbeatInterval(heartbeatInterval)
- e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot)
- e.StandbyServer.SetRaftServer(raftServer)
- e.PeerServer.SetJoinIndex(e.StandbyServer.JoinIndex())
- e.setMode(PeerMode)
- }
- }
- }
- // Stop the etcd instance.
- func (e *Etcd) Stop() {
- close(e.closeChan)
- <-e.stopNotify
- }
- // ReadyNotify returns a channel that is going to be closed
- // when the etcd instance is ready to accept connections.
- func (e *Etcd) ReadyNotify() <-chan bool {
- return e.readyNotify
- }
- func (e *Etcd) Mode() Mode {
- e.modeMutex.Lock()
- defer e.modeMutex.Unlock()
- return e.mode
- }
- func (e *Etcd) setMode(m Mode) {
- e.modeMutex.Lock()
- defer e.modeMutex.Unlock()
- e.mode = m
- }
- func isListenerClosing(err error) bool {
- // An error string equivalent to net.errClosing for using with
- // http.Serve() during server shutdown. Need to re-declare
- // here because it is not exported by "net" package.
- const errClosing = "use of closed network connection"
- return strings.Contains(err.Error(), errClosing)
- }
- type ModeGetter interface {
- Mode() Mode
- }
- type ModeHandler struct {
- ModeGetter
- PeerModeHandler http.Handler
- StandbyModeHandler http.Handler
- }
- func (h *ModeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- switch h.Mode() {
- case PeerMode:
- h.PeerModeHandler.ServeHTTP(w, r)
- case StandbyMode:
- h.StandbyModeHandler.ServeHTTP(w, r)
- }
- }
- type Mode int
- const (
- PeerMode Mode = iota
- StandbyMode
- )
|