| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- /*
- Copyright 2014 CoreOS, Inc.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package rafthttp
- import (
- "bytes"
- "fmt"
- "log"
- "net/http"
- "sync"
- "time"
- "github.com/coreos/etcd/etcdserver/stats"
- "github.com/coreos/etcd/pkg/pbutil"
- "github.com/coreos/etcd/pkg/types"
- "github.com/coreos/etcd/raft/raftpb"
- )
- const (
- connPerSender = 4
- senderBufSize = connPerSender * 4
- appRespBatchMs = 50
- ConnReadTimeout = 5 * time.Second
- ConnWriteTimeout = 5 * time.Second
- )
- type Sender interface {
- // StartStreaming enables streaming in the sender using the given writer,
- // which provides a fast and effecient way to send appendEntry messages.
- StartStreaming(w WriteFlusher, to types.ID, term uint64) (done <-chan struct{}, err error)
- Update(u string)
- // Send sends the data to the remote node. It is always non-blocking.
- // It may be fail to send data if it returns nil error.
- Send(m raftpb.Message) error
- // Stop performs any necessary finalization and terminates the Sender
- // elegantly.
- Stop()
- }
- func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
- s := &sender{
- tr: tr,
- u: u,
- cid: cid,
- p: p,
- fs: fs,
- shouldstop: shouldstop,
- batcher: NewBatcher(100, appRespBatchMs*time.Millisecond),
- q: make(chan []byte, senderBufSize),
- }
- s.wg.Add(connPerSender)
- for i := 0; i < connPerSender; i++ {
- go s.handle()
- }
- return s
- }
- type sender struct {
- tr http.RoundTripper
- u string
- cid types.ID
- p Processor
- fs *stats.FollowerStats
- shouldstop chan struct{}
- strmCln *streamClient
- batcher *Batcher
- strmSrv *streamServer
- strmSrvMu sync.Mutex
- q chan []byte
- mu sync.RWMutex
- wg sync.WaitGroup
- }
- func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
- s.strmSrvMu.Lock()
- defer s.strmSrvMu.Unlock()
- if s.strmSrv != nil {
- // ignore lower-term streaming request
- if term < s.strmSrv.term {
- return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, s.strmSrv.term)
- }
- // stop the existing one
- s.strmSrv.stop()
- }
- s.strmSrv = startStreamServer(w, to, term, s.fs)
- return s.strmSrv.stopNotify(), nil
- }
- func (s *sender) Update(u string) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.u = u
- }
- // TODO (xiangli): reasonable retry logic
- func (s *sender) Send(m raftpb.Message) error {
- s.maybeStopStream(m.Term)
- if shouldInitStream(m) && !s.hasStreamClient() {
- s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
- s.batcher.Reset(time.Now())
- }
- if canBatch(m) && s.hasStreamClient() {
- if s.batcher.ShouldBatch(time.Now()) {
- return nil
- }
- }
- if canUseStream(m) {
- if ok := s.tryStream(m); ok {
- return nil
- }
- }
- // TODO: don't block. we should be able to have 1000s
- // of messages out at a time.
- data := pbutil.MustMarshal(&m)
- select {
- case s.q <- data:
- return nil
- default:
- log.Printf("sender: reach the maximal serving to %s", s.u)
- return fmt.Errorf("reach maximal serving")
- }
- }
- func (s *sender) Stop() {
- close(s.q)
- s.wg.Wait()
- s.strmSrvMu.Lock()
- if s.strmSrv != nil {
- s.strmSrv.stop()
- }
- s.strmSrvMu.Unlock()
- if s.strmCln != nil {
- s.strmCln.stop()
- }
- }
- func (s *sender) maybeStopStream(term uint64) {
- if s.strmCln != nil && term > s.strmCln.term {
- s.strmCln.stop()
- s.strmCln = nil
- }
- s.strmSrvMu.Lock()
- defer s.strmSrvMu.Unlock()
- if s.strmSrv != nil && term > s.strmSrv.term {
- s.strmSrv.stop()
- s.strmSrv = nil
- }
- }
- func (s *sender) hasStreamClient() bool {
- return s.strmCln != nil && !s.strmCln.isStopped()
- }
- func (s *sender) initStream(from, to types.ID, term uint64) {
- strmCln := newStreamClient(from, to, term, s.p)
- s.mu.Lock()
- u := s.u
- s.mu.Unlock()
- if err := strmCln.start(s.tr, u, s.cid); err != nil {
- log.Printf("rafthttp: start stream client error: %v", err)
- return
- }
- s.strmCln = strmCln
- }
- func (s *sender) tryStream(m raftpb.Message) bool {
- s.strmSrvMu.Lock()
- defer s.strmSrvMu.Unlock()
- if s.strmSrv == nil || m.Term != s.strmSrv.term {
- return false
- }
- if err := s.strmSrv.send(m.Entries); err != nil {
- log.Printf("rafthttp: send stream message error: %v", err)
- s.strmSrv.stop()
- s.strmSrv = nil
- return false
- }
- return true
- }
- func (s *sender) handle() {
- defer s.wg.Done()
- for d := range s.q {
- start := time.Now()
- err := s.post(d)
- end := time.Now()
- if err != nil {
- s.fs.Fail()
- log.Printf("sender: %v", err)
- continue
- }
- s.fs.Succ(end.Sub(start))
- }
- }
- // post POSTs a data payload to a url. Returns nil if the POST succeeds,
- // error on any failure.
- func (s *sender) post(data []byte) error {
- s.mu.RLock()
- req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
- s.mu.RUnlock()
- if err != nil {
- return fmt.Errorf("new request to %s error: %v", s.u, err)
- }
- req.Header.Set("Content-Type", "application/protobuf")
- req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
- resp, err := s.tr.RoundTrip(req)
- if err != nil {
- return fmt.Errorf("error posting to %q: %v", req.URL.String(), err)
- }
- resp.Body.Close()
- switch resp.StatusCode {
- case http.StatusPreconditionFailed:
- select {
- case s.shouldstop <- struct{}{}:
- default:
- }
- log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
- return nil
- case http.StatusForbidden:
- select {
- case s.shouldstop <- struct{}{}:
- default:
- }
- log.Println("etcdserver: this member has been permanently removed from the cluster")
- log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
- return nil
- case http.StatusNoContent:
- return nil
- default:
- return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode))
- }
- }
|