123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package etcdserver
- import (
- "encoding/json"
- "expvar"
- "sort"
- "sync"
- "sync/atomic"
- "time"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- "github.com/coreos/etcd/etcdserver/membership"
- "github.com/coreos/etcd/pkg/contention"
- "github.com/coreos/etcd/pkg/pbutil"
- "github.com/coreos/etcd/pkg/types"
- "github.com/coreos/etcd/raft"
- "github.com/coreos/etcd/raft/raftpb"
- "github.com/coreos/etcd/rafthttp"
- "github.com/coreos/etcd/wal"
- "github.com/coreos/etcd/wal/walpb"
- "github.com/coreos/pkg/capnslog"
- )
- const (
- // Number of entries for slow follower to catch-up after compacting
- // the raft storage entries.
- // We expect the follower has a millisecond level latency with the leader.
- // The max throughput is around 10K. Keep a 5K entries is enough for helping
- // follower to catch up.
- numberOfCatchUpEntries = 5000
- // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
- // Assuming the RTT is around 10ms, 1MB max size is large enough.
- maxSizePerMsg = 1 * 1024 * 1024
- // Never overflow the rafthttp buffer, which is 4096.
- // TODO: a better const?
- maxInflightMsgs = 4096 / 8
- )
- var (
- // protects raftStatus
- raftStatusMu sync.Mutex
- // indirection for expvar func interface
- // expvar panics when publishing duplicate name
- // expvar does not support remove a registered name
- // so only register a func that calls raftStatus
- // and change raftStatus as we need.
- raftStatus func() raft.Status
- )
- func init() {
- raft.SetLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "raft"))
- expvar.Publish("raft.status", expvar.Func(func() interface{} {
- raftStatusMu.Lock()
- defer raftStatusMu.Unlock()
- return raftStatus()
- }))
- }
- type RaftTimer interface {
- Index() uint64
- Term() uint64
- }
- // apply contains entries, snapshot to be applied. Once
- // an apply is consumed, the entries will be persisted to
- // to raft storage concurrently; the application must read
- // raftDone before assuming the raft messages are stable.
- type apply struct {
- entries []raftpb.Entry
- snapshot raftpb.Snapshot
- // notifyc synchronizes etcd server applies with the raft node
- notifyc chan struct{}
- }
- type raftNode struct {
- // Cache of the latest raft index and raft term the server has seen.
- // These three unit64 fields must be the first elements to keep 64-bit
- // alignment for atomic access to the fields.
- index uint64
- term uint64
- lead uint64
- tickMu *sync.Mutex
- raftNodeConfig
- // a chan to send/receive snapshot
- msgSnapC chan raftpb.Message
- // a chan to send out apply
- applyc chan apply
- // a chan to send out readState
- readStateC chan raft.ReadState
- // utility
- ticker *time.Ticker
- // contention detectors for raft heartbeat message
- td *contention.TimeoutDetector
- stopped chan struct{}
- done chan struct{}
- }
- type raftNodeConfig struct {
- // to check if msg receiver is removed from cluster
- isIDRemoved func(id uint64) bool
- raft.Node
- raftStorage *raft.MemoryStorage
- storage Storage
- heartbeat time.Duration // for logging
- // transport specifies the transport to send and receive msgs to members.
- // Sending messages MUST NOT block. It is okay to drop messages, since
- // clients should timeout and reissue their messages.
- // If transport is nil, server will panic.
- transport rafthttp.Transporter
- }
- func newRaftNode(cfg raftNodeConfig) *raftNode {
- r := &raftNode{
- tickMu: new(sync.Mutex),
- raftNodeConfig: cfg,
- // set up contention detectors for raft heartbeat message.
- // expect to send a heartbeat within 2 heartbeat intervals.
- td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
- readStateC: make(chan raft.ReadState, 1),
- msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
- applyc: make(chan apply),
- stopped: make(chan struct{}),
- done: make(chan struct{}),
- }
- if r.heartbeat == 0 {
- r.ticker = &time.Ticker{}
- } else {
- r.ticker = time.NewTicker(r.heartbeat)
- }
- return r
- }
- // raft.Node does not have locks in Raft package
- func (r *raftNode) tick() {
- r.tickMu.Lock()
- r.Tick()
- r.tickMu.Unlock()
- }
- // start prepares and starts raftNode in a new goroutine. It is no longer safe
- // to modify the fields after it has been started.
- func (r *raftNode) start(rh *raftReadyHandler) {
- internalTimeout := time.Second
- go func() {
- defer r.onStop()
- islead := false
- for {
- select {
- case <-r.ticker.C:
- r.tick()
- case rd := <-r.Ready():
- if rd.SoftState != nil {
- newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
- if newLeader {
- leaderChanges.Inc()
- }
- if rd.SoftState.Lead == raft.None {
- hasLeader.Set(0)
- } else {
- hasLeader.Set(1)
- }
- atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
- islead = rd.RaftState == raft.StateLeader
- if islead {
- isLeader.Set(1)
- } else {
- isLeader.Set(0)
- }
- rh.updateLeadership(newLeader)
- r.td.Reset()
- }
- if len(rd.ReadStates) != 0 {
- select {
- case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
- case <-time.After(internalTimeout):
- plog.Warningf("timed out sending read state")
- case <-r.stopped:
- return
- }
- }
- notifyc := make(chan struct{}, 1)
- ap := apply{
- entries: rd.CommittedEntries,
- snapshot: rd.Snapshot,
- notifyc: notifyc,
- }
- updateCommittedIndex(&ap, rh)
- select {
- case r.applyc <- ap:
- case <-r.stopped:
- return
- }
- // the leader can write to its disk in parallel with replicating to the followers and them
- // writing to their disks.
- // For more details, check raft thesis 10.2.1
- if islead {
- // gofail: var raftBeforeLeaderSend struct{}
- r.transport.Send(r.processMessages(rd.Messages))
- }
- // gofail: var raftBeforeSave struct{}
- if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
- plog.Fatalf("raft save state and entries error: %v", err)
- }
- if !raft.IsEmptyHardState(rd.HardState) {
- proposalsCommitted.Set(float64(rd.HardState.Commit))
- }
- // gofail: var raftAfterSave struct{}
- if !raft.IsEmptySnap(rd.Snapshot) {
- // gofail: var raftBeforeSaveSnap struct{}
- if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
- plog.Fatalf("raft save snapshot error: %v", err)
- }
- // etcdserver now claim the snapshot has been persisted onto the disk
- notifyc <- struct{}{}
- // gofail: var raftAfterSaveSnap struct{}
- r.raftStorage.ApplySnapshot(rd.Snapshot)
- plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
- // gofail: var raftAfterApplySnap struct{}
- }
- r.raftStorage.Append(rd.Entries)
- if !islead {
- // finish processing incoming messages before we signal raftdone chan
- msgs := r.processMessages(rd.Messages)
- // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
- notifyc <- struct{}{}
- // Candidate or follower needs to wait for all pending configuration
- // changes to be applied before sending messages.
- // Otherwise we might incorrectly count votes (e.g. votes from removed members).
- // Also slow machine's follower raft-layer could proceed to become the leader
- // on its own single-node cluster, before apply-layer applies the config change.
- // We simply wait for ALL pending entries to be applied for now.
- // We might improve this later on if it causes unnecessary long blocking issues.
- waitApply := false
- for _, ent := range rd.CommittedEntries {
- if ent.Type == raftpb.EntryConfChange {
- waitApply = true
- break
- }
- }
- if waitApply {
- // blocks until 'applyAll' calls 'applyWait.Trigger'
- // to be in sync with scheduled config-change job
- // (assume notifyc has cap of 1)
- select {
- case notifyc <- struct{}{}:
- case <-r.stopped:
- return
- }
- }
- // gofail: var raftBeforeFollowerSend struct{}
- r.transport.Send(msgs)
- } else {
- // leader already processed 'MsgSnap' and signaled
- notifyc <- struct{}{}
- }
- r.Advance()
- case <-r.stopped:
- return
- }
- }
- }()
- }
- func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
- var ci uint64
- if len(ap.entries) != 0 {
- ci = ap.entries[len(ap.entries)-1].Index
- }
- if ap.snapshot.Metadata.Index > ci {
- ci = ap.snapshot.Metadata.Index
- }
- if ci != 0 {
- rh.updateCommittedIndex(ci)
- }
- }
- func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
- sentAppResp := false
- for i := len(ms) - 1; i >= 0; i-- {
- if r.isIDRemoved(ms[i].To) {
- ms[i].To = 0
- }
- if ms[i].Type == raftpb.MsgAppResp {
- if sentAppResp {
- ms[i].To = 0
- } else {
- sentAppResp = true
- }
- }
- if ms[i].Type == raftpb.MsgSnap {
- // There are two separate data store: the store for v2, and the KV for v3.
- // The msgSnap only contains the most recent snapshot of store without KV.
- // So we need to redirect the msgSnap to etcd server main loop for merging in the
- // current store snapshot and KV snapshot.
- select {
- case r.msgSnapC <- ms[i]:
- default:
- // drop msgSnap if the inflight chan if full.
- }
- ms[i].To = 0
- }
- if ms[i].Type == raftpb.MsgHeartbeat {
- ok, exceed := r.td.Observe(ms[i].To)
- if !ok {
- // TODO: limit request rate.
- plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v, to %x)", r.heartbeat, exceed, ms[i].To)
- plog.Warningf("server is likely overloaded")
- heartbeatSendFailures.Inc()
- }
- }
- }
- return ms
- }
- func (r *raftNode) apply() chan apply {
- return r.applyc
- }
- func (r *raftNode) stop() {
- r.stopped <- struct{}{}
- <-r.done
- }
- func (r *raftNode) onStop() {
- r.Stop()
- r.ticker.Stop()
- r.transport.Stop()
- if err := r.storage.Close(); err != nil {
- plog.Panicf("raft close storage error: %v", err)
- }
- close(r.done)
- }
- // for testing
- func (r *raftNode) pauseSending() {
- p := r.transport.(rafthttp.Pausable)
- p.Pause()
- }
- func (r *raftNode) resumeSending() {
- p := r.transport.(rafthttp.Pausable)
- p.Resume()
- }
- // advanceTicks advances ticks of Raft node.
- // This can be used for fast-forwarding election
- // ticks in multi data-center deployments, thus
- // speeding up election process.
- func (r *raftNode) advanceTicks(ticks int) {
- for i := 0; i < ticks; i++ {
- r.tick()
- }
- }
- func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
- var err error
- member := cl.MemberByName(cfg.Name)
- metadata := pbutil.MustMarshal(
- &pb.Metadata{
- NodeID: uint64(member.ID),
- ClusterID: uint64(cl.ID()),
- },
- )
- if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
- plog.Panicf("create wal error: %v", err)
- }
- peers := make([]raft.Peer, len(ids))
- for i, id := range ids {
- ctx, err := json.Marshal((*cl).Member(id))
- if err != nil {
- plog.Panicf("marshal member should never fail: %v", err)
- }
- peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
- }
- id = member.ID
- plog.Infof("starting member %s in cluster %s", id, cl.ID())
- s = raft.NewMemoryStorage()
- c := &raft.Config{
- ID: uint64(id),
- ElectionTick: cfg.ElectionTicks,
- HeartbeatTick: 1,
- Storage: s,
- MaxSizePerMsg: maxSizePerMsg,
- MaxInflightMsgs: maxInflightMsgs,
- CheckQuorum: true,
- }
- n = raft.StartNode(c, peers)
- raftStatusMu.Lock()
- raftStatus = n.Status
- raftStatusMu.Unlock()
- return id, n, s, w
- }
- func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
- var walsnap walpb.Snapshot
- if snapshot != nil {
- walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
- }
- w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
- plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
- cl := membership.NewCluster("")
- cl.SetID(cid)
- s := raft.NewMemoryStorage()
- if snapshot != nil {
- s.ApplySnapshot(*snapshot)
- }
- s.SetHardState(st)
- s.Append(ents)
- c := &raft.Config{
- ID: uint64(id),
- ElectionTick: cfg.ElectionTicks,
- HeartbeatTick: 1,
- Storage: s,
- MaxSizePerMsg: maxSizePerMsg,
- MaxInflightMsgs: maxInflightMsgs,
- CheckQuorum: true,
- }
- n := raft.RestartNode(c)
- raftStatusMu.Lock()
- raftStatus = n.Status
- raftStatusMu.Unlock()
- return id, cl, n, s, w
- }
- func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
- var walsnap walpb.Snapshot
- if snapshot != nil {
- walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
- }
- w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
- // discard the previously uncommitted entries
- for i, ent := range ents {
- if ent.Index > st.Commit {
- plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
- ents = ents[:i]
- break
- }
- }
- // force append the configuration change entries
- toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit)
- ents = append(ents, toAppEnts...)
- // force commit newly appended entries
- err := w.Save(raftpb.HardState{}, toAppEnts)
- if err != nil {
- plog.Fatalf("%v", err)
- }
- if len(ents) != 0 {
- st.Commit = ents[len(ents)-1].Index
- }
- plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
- cl := membership.NewCluster("")
- cl.SetID(cid)
- s := raft.NewMemoryStorage()
- if snapshot != nil {
- s.ApplySnapshot(*snapshot)
- }
- s.SetHardState(st)
- s.Append(ents)
- c := &raft.Config{
- ID: uint64(id),
- ElectionTick: cfg.ElectionTicks,
- HeartbeatTick: 1,
- Storage: s,
- MaxSizePerMsg: maxSizePerMsg,
- MaxInflightMsgs: maxInflightMsgs,
- CheckQuorum: true,
- }
- n := raft.RestartNode(c)
- raftStatus = n.Status
- return id, cl, n, s, w
- }
- // getIDs returns an ordered set of IDs included in the given snapshot and
- // the entries. The given snapshot/entries can contain two kinds of
- // ID-related entry:
- // - ConfChangeAddNode, in which case the contained ID will be added into the set.
- // - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
- func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
- ids := make(map[uint64]bool)
- if snap != nil {
- for _, id := range snap.Metadata.ConfState.Nodes {
- ids[id] = true
- }
- }
- for _, e := range ents {
- if e.Type != raftpb.EntryConfChange {
- continue
- }
- var cc raftpb.ConfChange
- pbutil.MustUnmarshal(&cc, e.Data)
- switch cc.Type {
- case raftpb.ConfChangeAddNode:
- ids[cc.NodeID] = true
- case raftpb.ConfChangeRemoveNode:
- delete(ids, cc.NodeID)
- case raftpb.ConfChangeUpdateNode:
- // do nothing
- default:
- plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
- }
- }
- sids := make(types.Uint64Slice, 0, len(ids))
- for id := range ids {
- sids = append(sids, id)
- }
- sort.Sort(sids)
- return []uint64(sids)
- }
- // createConfigChangeEnts creates a series of Raft entries (i.e.
- // EntryConfChange) to remove the set of given IDs from the cluster. The ID
- // `self` is _not_ removed, even if present in the set.
- // If `self` is not inside the given ids, it creates a Raft entry to add a
- // default member with the given `self`.
- func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
- ents := make([]raftpb.Entry, 0)
- next := index + 1
- found := false
- for _, id := range ids {
- if id == self {
- found = true
- continue
- }
- cc := &raftpb.ConfChange{
- Type: raftpb.ConfChangeRemoveNode,
- NodeID: id,
- }
- e := raftpb.Entry{
- Type: raftpb.EntryConfChange,
- Data: pbutil.MustMarshal(cc),
- Term: term,
- Index: next,
- }
- ents = append(ents, e)
- next++
- }
- if !found {
- m := membership.Member{
- ID: types.ID(self),
- RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
- }
- ctx, err := json.Marshal(m)
- if err != nil {
- plog.Panicf("marshal member should never fail: %v", err)
- }
- cc := &raftpb.ConfChange{
- Type: raftpb.ConfChangeAddNode,
- NodeID: self,
- Context: ctx,
- }
- e := raftpb.Entry{
- Type: raftpb.EntryConfChange,
- Data: pbutil.MustMarshal(cc),
- Term: term,
- Index: next,
- }
- ents = append(ents, e)
- }
- return ents
- }
|