123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package rafthttp
- import (
- "context"
- "sync"
- "time"
- "go.etcd.io/etcd/etcdserver/api/snap"
- stats "go.etcd.io/etcd/etcdserver/api/v2stats"
- "go.etcd.io/etcd/pkg/types"
- "go.etcd.io/etcd/raft"
- "go.etcd.io/etcd/raft/raftpb"
- "go.uber.org/zap"
- "golang.org/x/time/rate"
- )
- const (
- // ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates.
- // A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for
- // tcp keepalive failing to detect a bad connection, which is at minutes level.
- // For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage
- // to keep the connection alive.
- // For short term pipeline connections, the connection MUST be killed to avoid it being
- // put back to http pkg connection pool.
- ConnReadTimeout = 5 * time.Second
- ConnWriteTimeout = 5 * time.Second
- recvBufSize = 4096
- // maxPendingProposals holds the proposals during one leader election process.
- // Generally one leader election takes at most 1 sec. It should have
- // 0-2 election conflicts, and each one takes 0.5 sec.
- // We assume the number of concurrent proposers is smaller than 4096.
- // One client blocks on its proposal for at least 1 sec, so 4096 is enough
- // to hold all proposals.
- maxPendingProposals = 4096
- streamAppV2 = "streamMsgAppV2"
- streamMsg = "streamMsg"
- pipelineMsg = "pipeline"
- sendSnap = "sendMsgSnap"
- )
- type Peer interface {
- // send sends the message to the remote peer. The function is non-blocking
- // and has no promise that the message will be received by the remote.
- // When it fails to send message out, it will report the status to underlying
- // raft.
- send(m raftpb.Message)
- // sendSnap sends the merged snapshot message to the remote peer. Its behavior
- // is similar to send.
- sendSnap(m snap.Message)
- // update updates the urls of remote peer.
- update(urls types.URLs)
- // attachOutgoingConn attaches the outgoing connection to the peer for
- // stream usage. After the call, the ownership of the outgoing
- // connection hands over to the peer. The peer will close the connection
- // when it is no longer used.
- attachOutgoingConn(conn *outgoingConn)
- // activeSince returns the time that the connection with the
- // peer becomes active.
- activeSince() time.Time
- // stop performs any necessary finalization and terminates the peer
- // elegantly.
- stop()
- }
- // peer is the representative of a remote raft node. Local raft node sends
- // messages to the remote through peer.
- // Each peer has two underlying mechanisms to send out a message: stream and
- // pipeline.
- // A stream is a receiver initialized long-polling connection, which
- // is always open to transfer messages. Besides general stream, peer also has
- // a optimized stream for sending msgApp since msgApp accounts for large part
- // of all messages. Only raft leader uses the optimized stream to send msgApp
- // to the remote follower node.
- // A pipeline is a series of http clients that send http requests to the remote.
- // It is only used when the stream has not been established.
- type peer struct {
- lg *zap.Logger
- localID types.ID
- // id of the remote raft peer node
- id types.ID
- r Raft
- status *peerStatus
- picker *urlPicker
- msgAppV2Writer *streamWriter
- writer *streamWriter
- pipeline *pipeline
- snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
- msgAppV2Reader *streamReader
- msgAppReader *streamReader
- recvc chan raftpb.Message
- propc chan raftpb.Message
- mu sync.Mutex
- paused bool
- cancel context.CancelFunc // cancel pending works in go routine created by peer.
- stopc chan struct{}
- }
- func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
- if t.Logger != nil {
- t.Logger.Info("starting remote peer", zap.String("remote-peer-id", peerID.String()))
- } else {
- plog.Infof("starting peer %s...", peerID)
- }
- defer func() {
- if t.Logger != nil {
- t.Logger.Info("started remote peer", zap.String("remote-peer-id", peerID.String()))
- } else {
- plog.Infof("started peer %s", peerID)
- }
- }()
- status := newPeerStatus(t.Logger, t.ID, peerID)
- picker := newURLPicker(urls)
- errorc := t.ErrorC
- r := t.Raft
- pipeline := &pipeline{
- peerID: peerID,
- tr: t,
- picker: picker,
- status: status,
- followerStats: fs,
- raft: r,
- errorc: errorc,
- }
- pipeline.start()
- p := &peer{
- lg: t.Logger,
- localID: t.ID,
- id: peerID,
- r: r,
- status: status,
- picker: picker,
- msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
- writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
- pipeline: pipeline,
- snapSender: newSnapshotSender(t, picker, peerID, status),
- recvc: make(chan raftpb.Message, recvBufSize),
- propc: make(chan raftpb.Message, maxPendingProposals),
- stopc: make(chan struct{}),
- }
- ctx, cancel := context.WithCancel(context.Background())
- p.cancel = cancel
- go func() {
- for {
- select {
- case mm := <-p.recvc:
- if err := r.Process(ctx, mm); err != nil {
- if t.Logger != nil {
- t.Logger.Warn("failed to process Raft message", zap.Error(err))
- } else {
- plog.Warningf("failed to process raft message (%v)", err)
- }
- }
- case <-p.stopc:
- return
- }
- }
- }()
- // r.Process might block for processing proposal when there is no leader.
- // Thus propc must be put into a separate routine with recvc to avoid blocking
- // processing other raft messages.
- go func() {
- for {
- select {
- case mm := <-p.propc:
- if err := r.Process(ctx, mm); err != nil {
- plog.Warningf("failed to process raft message (%v)", err)
- }
- case <-p.stopc:
- return
- }
- }
- }()
- p.msgAppV2Reader = &streamReader{
- lg: t.Logger,
- peerID: peerID,
- typ: streamTypeMsgAppV2,
- tr: t,
- picker: picker,
- status: status,
- recvc: p.recvc,
- propc: p.propc,
- rl: rate.NewLimiter(t.DialRetryFrequency, 1),
- }
- p.msgAppReader = &streamReader{
- lg: t.Logger,
- peerID: peerID,
- typ: streamTypeMessage,
- tr: t,
- picker: picker,
- status: status,
- recvc: p.recvc,
- propc: p.propc,
- rl: rate.NewLimiter(t.DialRetryFrequency, 1),
- }
- p.msgAppV2Reader.start()
- p.msgAppReader.start()
- return p
- }
- func (p *peer) send(m raftpb.Message) {
- p.mu.Lock()
- paused := p.paused
- p.mu.Unlock()
- if paused {
- return
- }
- writec, name := p.pick(m)
- select {
- case writec <- m:
- default:
- p.r.ReportUnreachable(m.To)
- if isMsgSnap(m) {
- p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
- }
- if p.status.isActive() {
- if p.lg != nil {
- p.lg.Warn(
- "dropped internal Raft message since sending buffer is full (overloaded network)",
- zap.String("message-type", m.Type.String()),
- zap.String("local-member-id", p.localID.String()),
- zap.String("from", types.ID(m.From).String()),
- zap.String("remote-peer-id", p.id.String()),
- zap.Bool("remote-peer-active", p.status.isActive()),
- )
- } else {
- plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
- }
- } else {
- if p.lg != nil {
- p.lg.Warn(
- "dropped internal Raft message since sending buffer is full (overloaded network)",
- zap.String("message-type", m.Type.String()),
- zap.String("local-member-id", p.localID.String()),
- zap.String("from", types.ID(m.From).String()),
- zap.String("remote-peer-id", p.id.String()),
- zap.Bool("remote-peer-active", p.status.isActive()),
- )
- } else {
- plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
- }
- }
- sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
- }
- }
- func (p *peer) sendSnap(m snap.Message) {
- go p.snapSender.send(m)
- }
- func (p *peer) update(urls types.URLs) {
- p.picker.update(urls)
- }
- func (p *peer) attachOutgoingConn(conn *outgoingConn) {
- var ok bool
- switch conn.t {
- case streamTypeMsgAppV2:
- ok = p.msgAppV2Writer.attach(conn)
- case streamTypeMessage:
- ok = p.writer.attach(conn)
- default:
- if p.lg != nil {
- p.lg.Panic("unknown stream type", zap.String("type", conn.t.String()))
- } else {
- plog.Panicf("unhandled stream type %s", conn.t)
- }
- }
- if !ok {
- conn.Close()
- }
- }
- func (p *peer) activeSince() time.Time { return p.status.activeSince() }
- // Pause pauses the peer. The peer will simply drops all incoming
- // messages without returning an error.
- func (p *peer) Pause() {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.paused = true
- p.msgAppReader.pause()
- p.msgAppV2Reader.pause()
- }
- // Resume resumes a paused peer.
- func (p *peer) Resume() {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.paused = false
- p.msgAppReader.resume()
- p.msgAppV2Reader.resume()
- }
- func (p *peer) stop() {
- if p.lg != nil {
- p.lg.Info("stopping remote peer", zap.String("remote-peer-id", p.id.String()))
- } else {
- plog.Infof("stopping peer %s...", p.id)
- }
- defer func() {
- if p.lg != nil {
- p.lg.Info("stopped remote peer", zap.String("remote-peer-id", p.id.String()))
- } else {
- plog.Infof("stopped peer %s", p.id)
- }
- }()
- close(p.stopc)
- p.cancel()
- p.msgAppV2Writer.stop()
- p.writer.stop()
- p.pipeline.stop()
- p.snapSender.stop()
- p.msgAppV2Reader.stop()
- p.msgAppReader.stop()
- }
- // pick picks a chan for sending the given message. The picked chan and the picked chan
- // string name are returned.
- func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
- var ok bool
- // Considering MsgSnap may have a big size, e.g., 1G, and will block
- // stream for a long time, only use one of the N pipelines to send MsgSnap.
- if isMsgSnap(m) {
- return p.pipeline.msgc, pipelineMsg
- } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
- return writec, streamAppV2
- } else if writec, ok = p.writer.writec(); ok {
- return writec, streamMsg
- }
- return p.pipeline.msgc, pipelineMsg
- }
- func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
- func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }
|