Browse Source

Merge remote-tracking branch 'coreos/master' into merge

* coreos/master:
  scripts: build-docker tag and use ENTRYPOINT
  scripts: build-release add etcd-migrate
  create .godir
  raft: optimistically increase the next if the follower is already matched
  raft: add handleHeartbeat handleHeartbeat commits to the commit index in the message. It never decreases the commit index of the raft state machine.
  rafthttp: send takes raft message instead of bytes
  *: add rafthttp pkg into test list
  raft: include commitIndex in heartbeat
  rafthttp: move server stats in raftHandler to etcdserver
  *: etcdhttp.raftHandler -> rafthttp.RaftHandler
  etcdserver: rename sender.go -> sendhub.go
  *: etcdserver.sender -> rafthttp.Sender

Conflicts:
	raft/log.go
	raft/raft_paper_test.go
Ben Darnell 11 years ago
parent
commit
b29240baf0

+ 1 - 0
.godir

@@ -0,0 +1 @@
+github.com/coreos/etcd

+ 2 - 59
etcdserver/etcdhttp/peer.go

@@ -18,14 +18,11 @@ package etcdhttp
 
 
 import (
 import (
 	"encoding/json"
 	"encoding/json"
-	"io/ioutil"
 	"log"
 	"log"
 	"net/http"
 	"net/http"
 
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver"
-	"github.com/coreos/etcd/pkg/types"
-	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
 )
 )
 
 
 const (
 const (
@@ -35,12 +32,7 @@ const (
 
 
 // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
 // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
 func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
 func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
-	rh := &raftHandler{
-		stats:       server,
-		server:      server,
-		clusterInfo: server.Cluster,
-	}
-
+	rh := rafthttp.NewHandler(server, server.Cluster.ID())
 	mh := &peerMembersHandler{
 	mh := &peerMembersHandler{
 		clusterInfo: server.Cluster,
 		clusterInfo: server.Cluster,
 	}
 	}
@@ -52,55 +44,6 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
 	return mux
 	return mux
 }
 }
 
 
-type raftHandler struct {
-	stats       etcdserver.Stats
-	server      etcdserver.Server
-	clusterInfo etcdserver.ClusterInfo
-}
-
-func (h *raftHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	if !allowMethod(w, r.Method, "POST") {
-		return
-	}
-
-	wcid := h.clusterInfo.ID().String()
-	w.Header().Set("X-Etcd-Cluster-ID", wcid)
-
-	gcid := r.Header.Get("X-Etcd-Cluster-ID")
-	if gcid != wcid {
-		log.Printf("etcdhttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
-		http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed)
-		return
-	}
-
-	b, err := ioutil.ReadAll(r.Body)
-	if err != nil {
-		log.Println("etcdhttp: error reading raft message:", err)
-		http.Error(w, "error reading raft message", http.StatusBadRequest)
-		return
-	}
-	var m raftpb.Message
-	if err := m.Unmarshal(b); err != nil {
-		log.Println("etcdhttp: error unmarshaling raft message:", err)
-		http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
-		return
-	}
-	if err := h.server.Process(context.TODO(), m); err != nil {
-		switch err {
-		case etcdserver.ErrRemoved:
-			log.Printf("etcdhttp: reject message from removed member %s", types.ID(m.From).String())
-			http.Error(w, "cannot process message from removed member", http.StatusForbidden)
-		default:
-			writeError(w, err)
-		}
-		return
-	}
-	if m.Type == raftpb.MsgApp {
-		h.stats.UpdateRecvApp(types.ID(m.From), r.ContentLength)
-	}
-	w.WriteHeader(http.StatusNoContent)
-}
-
 type peerMembersHandler struct {
 type peerMembersHandler struct {
 	clusterInfo etcdserver.ClusterInfo
 	clusterInfo etcdserver.ClusterInfo
 }
 }

+ 0 - 150
etcdserver/etcdhttp/peer_test.go

@@ -17,165 +17,15 @@
 package etcdhttp
 package etcdhttp
 
 
 import (
 import (
-	"bytes"
 	"encoding/json"
 	"encoding/json"
-	"errors"
-	"io"
 	"net/http"
 	"net/http"
 	"net/http/httptest"
 	"net/http/httptest"
 	"path"
 	"path"
-	"strings"
 	"testing"
 	"testing"
 
 
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver"
-	"github.com/coreos/etcd/raft/raftpb"
 )
 )
 
 
-func mustMarshalMsg(t *testing.T, m raftpb.Message) []byte {
-	json, err := m.Marshal()
-	if err != nil {
-		t.Fatalf("error marshalling raft Message: %#v", err)
-	}
-	return json
-}
-
-// errReader implements io.Reader to facilitate a broken request.
-type errReader struct{}
-
-func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") }
-
-func TestServeRaft(t *testing.T) {
-	testCases := []struct {
-		method    string
-		body      io.Reader
-		serverErr error
-		clusterID string
-
-		wcode int
-	}{
-		{
-			// bad method
-			"GET",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			nil,
-			"0",
-			http.StatusMethodNotAllowed,
-		},
-		{
-			// bad method
-			"PUT",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			nil,
-			"0",
-			http.StatusMethodNotAllowed,
-		},
-		{
-			// bad method
-			"DELETE",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			nil,
-			"0",
-			http.StatusMethodNotAllowed,
-		},
-		{
-			// bad request body
-			"POST",
-			&errReader{},
-			nil,
-			"0",
-			http.StatusBadRequest,
-		},
-		{
-			// bad request protobuf
-			"POST",
-			strings.NewReader("malformed garbage"),
-			nil,
-			"0",
-			http.StatusBadRequest,
-		},
-		{
-			// good request, etcdserver.Server internal error
-			"POST",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			errors.New("some error"),
-			"0",
-			http.StatusInternalServerError,
-		},
-		{
-			// good request from removed member
-			"POST",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			etcdserver.ErrRemoved,
-			"0",
-			http.StatusForbidden,
-		},
-		{
-			// good request
-			"POST",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			nil,
-			"1",
-			http.StatusPreconditionFailed,
-		},
-		{
-			// good request
-			"POST",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			nil,
-			"0",
-			http.StatusNoContent,
-		},
-	}
-	for i, tt := range testCases {
-		req, err := http.NewRequest(tt.method, "foo", tt.body)
-		if err != nil {
-			t.Fatalf("#%d: could not create request: %#v", i, err)
-		}
-		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
-		rw := httptest.NewRecorder()
-		h := &raftHandler{stats: nil, server: &errServer{tt.serverErr}, clusterInfo: &fakeCluster{id: 0}}
-		h.ServeHTTP(rw, req)
-		if rw.Code != tt.wcode {
-			t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
-		}
-	}
-}
-
 func TestServeMembersFails(t *testing.T) {
 func TestServeMembersFails(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		method string
 		method string

+ 124 - 0
etcdserver/sendhub.go

@@ -0,0 +1,124 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package etcdserver
+
+import (
+	"log"
+	"net/http"
+	"net/url"
+	"path"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
+)
+
+const (
+	raftPrefix = "/raft"
+)
+
+type sendHub struct {
+	tr         http.RoundTripper
+	cl         ClusterInfo
+	ss         *stats.ServerStats
+	ls         *stats.LeaderStats
+	senders    map[types.ID]rafthttp.Sender
+	shouldstop chan struct{}
+}
+
+// newSendHub creates the default send hub used to transport raft messages
+// to other members. The returned sendHub will update the given ServerStats and
+// LeaderStats appropriately.
+func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
+	h := &sendHub{
+		tr:         t,
+		cl:         cl,
+		ss:         ss,
+		ls:         ls,
+		senders:    make(map[types.ID]rafthttp.Sender),
+		shouldstop: make(chan struct{}, 1),
+	}
+	for _, m := range cl.Members() {
+		h.Add(m)
+	}
+	return h
+}
+
+func (h *sendHub) Send(msgs []raftpb.Message) {
+	for _, m := range msgs {
+		to := types.ID(m.To)
+		s, ok := h.senders[to]
+		if !ok {
+			if !h.cl.IsIDRemoved(to) {
+				log.Printf("etcdserver: send message to unknown receiver %s", to)
+			}
+			continue
+		}
+
+		if m.Type == raftpb.MsgApp {
+			h.ss.SendAppendReq(m.Size())
+		}
+
+		s.Send(m)
+	}
+}
+
+func (h *sendHub) Stop() {
+	for _, s := range h.senders {
+		s.Stop()
+	}
+}
+
+func (h *sendHub) ShouldStopNotify() <-chan struct{} {
+	return h.shouldstop
+}
+
+func (h *sendHub) Add(m *Member) {
+	if _, ok := h.senders[m.ID]; ok {
+		return
+	}
+	// TODO: considering how to switch between all available peer urls
+	peerURL := m.PickPeerURL()
+	u, err := url.Parse(peerURL)
+	if err != nil {
+		log.Panicf("unexpect peer url %s", peerURL)
+	}
+	u.Path = path.Join(u.Path, raftPrefix)
+	fs := h.ls.Follower(m.ID.String())
+	s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), fs, h.shouldstop)
+	h.senders[m.ID] = s
+}
+
+func (h *sendHub) Remove(id types.ID) {
+	h.senders[id].Stop()
+	delete(h.senders, id)
+}
+
+func (h *sendHub) Update(m *Member) {
+	// TODO: return error or just panic?
+	if _, ok := h.senders[m.ID]; !ok {
+		return
+	}
+	peerURL := m.PickPeerURL()
+	u, err := url.Parse(peerURL)
+	if err != nil {
+		log.Panicf("unexpect peer url %s", peerURL)
+	}
+	u.Path = path.Join(u.Path, raftPrefix)
+	h.senders[m.ID].Update(u.String())
+}

+ 127 - 0
etcdserver/sendhub_test.go

@@ -0,0 +1,127 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package etcdserver
+
+import (
+	"net/http"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/testutil"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+func TestSendHubInitSenders(t *testing.T) {
+	membs := []*Member{
+		newTestMember(1, []string{"http://a"}, "", nil),
+		newTestMember(2, []string{"http://b"}, "", nil),
+		newTestMember(3, []string{"http://c"}, "", nil),
+	}
+	cl := newTestCluster(membs)
+	ls := stats.NewLeaderStats("")
+	h := newSendHub(nil, cl, nil, ls)
+
+	ids := cl.MemberIDs()
+	if len(h.senders) != len(ids) {
+		t.Errorf("len(ids) = %d, want %d", len(h.senders), len(ids))
+	}
+	for _, id := range ids {
+		if _, ok := h.senders[id]; !ok {
+			t.Errorf("senders[%s] is nil, want exists", id)
+		}
+	}
+}
+
+func TestSendHubAdd(t *testing.T) {
+	cl := newTestCluster(nil)
+	ls := stats.NewLeaderStats("")
+	h := newSendHub(nil, cl, nil, ls)
+	m := newTestMember(1, []string{"http://a"}, "", nil)
+	h.Add(m)
+
+	if _, ok := ls.Followers["1"]; !ok {
+		t.Errorf("FollowerStats[1] is nil, want exists")
+	}
+	s, ok := h.senders[types.ID(1)]
+	if !ok {
+		t.Fatalf("senders[1] is nil, want exists")
+	}
+
+	h.Add(m)
+	ns := h.senders[types.ID(1)]
+	if s != ns {
+		t.Errorf("sender = %p, want %p", ns, s)
+	}
+}
+
+func TestSendHubRemove(t *testing.T) {
+	membs := []*Member{
+		newTestMember(1, []string{"http://a"}, "", nil),
+	}
+	cl := newTestCluster(membs)
+	ls := stats.NewLeaderStats("")
+	h := newSendHub(nil, cl, nil, ls)
+	h.Remove(types.ID(1))
+
+	if _, ok := h.senders[types.ID(1)]; ok {
+		t.Fatalf("senders[1] exists, want removed")
+	}
+}
+
+func TestSendHubShouldStop(t *testing.T) {
+	membs := []*Member{
+		newTestMember(1, []string{"http://a"}, "", nil),
+	}
+	tr := newRespRoundTripper(http.StatusForbidden, nil)
+	cl := newTestCluster(membs)
+	ls := stats.NewLeaderStats("")
+	h := newSendHub(tr, cl, nil, ls)
+
+	shouldstop := h.ShouldStopNotify()
+	select {
+	case <-shouldstop:
+		t.Fatalf("received unexpected shouldstop notification")
+	case <-time.After(10 * time.Millisecond):
+	}
+	h.senders[1].Send(raftpb.Message{})
+
+	testutil.ForceGosched()
+	select {
+	case <-shouldstop:
+	default:
+		t.Fatalf("cannot receive stop notification")
+	}
+}
+
+type respRoundTripper struct {
+	code int
+	err  error
+}
+
+func newRespRoundTripper(code int, err error) *respRoundTripper {
+	return &respRoundTripper{code: code, err: err}
+}
+func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
+	return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err
+}
+
+type nopReadCloser struct{}
+
+func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil }
+func (n *nopReadCloser) Close() error               { return nil }

+ 6 - 8
etcdserver/server.go

@@ -33,6 +33,7 @@ import (
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/discovery"
+	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/pbutil"
@@ -61,7 +62,6 @@ const (
 var (
 var (
 	ErrUnknownMethod = errors.New("etcdserver: unknown method")
 	ErrUnknownMethod = errors.New("etcdserver: unknown method")
 	ErrStopped       = errors.New("etcdserver: server stopped")
 	ErrStopped       = errors.New("etcdserver: server stopped")
-	ErrRemoved       = errors.New("etcdserver: server removed")
 	ErrIDRemoved     = errors.New("etcdserver: ID removed")
 	ErrIDRemoved     = errors.New("etcdserver: ID removed")
 	ErrIDExists      = errors.New("etcdserver: ID exists")
 	ErrIDExists      = errors.New("etcdserver: ID exists")
 	ErrIDNotFound    = errors.New("etcdserver: ID not found")
 	ErrIDNotFound    = errors.New("etcdserver: ID not found")
@@ -145,8 +145,6 @@ type Stats interface {
 	LeaderStats() []byte
 	LeaderStats() []byte
 	// StoreStats returns statistics of the store backing this EtcdServer
 	// StoreStats returns statistics of the store backing this EtcdServer
 	StoreStats() []byte
 	StoreStats() []byte
-	// UpdateRecvApp updates the underlying statistics in response to a receiving an Append request
-	UpdateRecvApp(from types.ID, length int64)
 }
 }
 
 
 type RaftTimer interface {
 type RaftTimer interface {
@@ -323,7 +321,11 @@ func (s *EtcdServer) ID() types.ID { return s.id }
 
 
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 	if s.Cluster.IsIDRemoved(types.ID(m.From)) {
 	if s.Cluster.IsIDRemoved(types.ID(m.From)) {
-		return ErrRemoved
+		log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String())
+		return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
+	}
+	if m.Type == raftpb.MsgApp {
+		s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
 	}
 	}
 	return s.node.Step(ctx, m)
 	return s.node.Step(ctx, m)
 }
 }
@@ -493,10 +495,6 @@ func (s *EtcdServer) LeaderStats() []byte {
 
 
 func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
 func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
 
 
-func (s *EtcdServer) UpdateRecvApp(from types.ID, length int64) {
-	s.stats.RecvAppendReq(from.String(), int(length))
-}
-
 func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
 func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
 	// TODO: move Member to protobuf type
 	// TODO: move Member to protobuf type
 	b, err := json.Marshal(memb)
 	b, err := json.Marshal(memb)

+ 12 - 6
raft/log.go

@@ -86,11 +86,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
 		default:
 		default:
 			l.append(ci-1, ents[ci-from:]...)
 			l.append(ci-1, ents[ci-from:]...)
 		}
 		}
-		tocommit := min(committed, lastnewi)
-		// if toCommit > commitIndex, set commitIndex = toCommit
-		if l.committed < tocommit {
-			l.committed = tocommit
-		}
+		l.commitTo(min(committed, lastnewi))
 		return lastnewi, true
 		return lastnewi, true
 	}
 	}
 	return 0, false
 	return 0, false
@@ -171,6 +167,16 @@ func (l *raftLog) lastIndex() uint64 {
 	return index
 	return index
 }
 }
 
 
+func (l *raftLog) commitTo(tocommit uint64) {
+	// never decrease commit
+	if l.committed < tocommit {
+		if l.lastIndex() < tocommit {
+			panic("committed out of range")
+		}
+		l.committed = tocommit
+	}
+}
+
 func (l *raftLog) appliedTo(i uint64) {
 func (l *raftLog) appliedTo(i uint64) {
 	if i == 0 {
 	if i == 0 {
 		return
 		return
@@ -235,7 +241,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool {
 
 
 func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
 func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
 	if maxIndex > l.committed && l.term(maxIndex) == term {
 	if maxIndex > l.committed && l.term(maxIndex) == term {
-		l.committed = maxIndex
+		l.commitTo(maxIndex)
 		return true
 		return true
 	}
 	}
 	return false
 	return false

+ 32 - 0
raft/log_test.go

@@ -400,6 +400,38 @@ func TestUnstableEnts(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestCommitTo(t *testing.T) {
+	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}}
+	commit := uint64(2)
+	tests := []struct {
+		commit  uint64
+		wcommit uint64
+		wpanic  bool
+	}{
+		{3, 3, false},
+		{1, 2, false}, // never decrease
+		{4, 0, true},  // commit out of range -> panic
+	}
+	for i, tt := range tests {
+		func() {
+			defer func() {
+				if r := recover(); r != nil {
+					if tt.wpanic != true {
+						t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic)
+					}
+				}
+			}()
+			raftLog := newLog(NewMemoryStorage())
+			raftLog.append(0, previousEnts...)
+			raftLog.committed = commit
+			raftLog.commitTo(tt.commit)
+			if raftLog.committed != tt.wcommit {
+				t.Errorf("#%d: committed = %d, want %d", i, raftLog.committed, tt.wcommit)
+			}
+		}()
+	}
+}
+
 func TestStableTo(t *testing.T) {
 func TestStableTo(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		stable    uint64
 		stable    uint64

+ 41 - 6
raft/raft.go

@@ -63,12 +63,26 @@ func (pr *progress) update(n uint64) {
 	pr.next = n + 1
 	pr.next = n + 1
 }
 }
 
 
+func (pr *progress) optimisticUpdate(n uint64) {
+	pr.next = n + 1
+}
+
 // maybeDecrTo returns false if the given to index comes from an out of order message.
 // maybeDecrTo returns false if the given to index comes from an out of order message.
 // Otherwise it decreases the progress next index and returns true.
 // Otherwise it decreases the progress next index and returns true.
 func (pr *progress) maybeDecrTo(to uint64) bool {
 func (pr *progress) maybeDecrTo(to uint64) bool {
-	// the rejection must be stale if the progress has matched with
-	// follower or "to" does not match next - 1
-	if pr.match != 0 || pr.next-1 != to {
+	if pr.match != 0 {
+		// the rejection must be stale if the progress has matched and "to"
+		// is smaller than "match".
+		if to <= pr.match {
+			return false
+		}
+		// directly decrease next to match + 1
+		pr.next = pr.match + 1
+		return true
+	}
+
+	// the rejection must be stale if "to" does not match next - 1
+	if pr.next-1 != to {
 		return false
 		return false
 	}
 	}
 
 
@@ -214,15 +228,28 @@ func (r *raft) sendAppend(to uint64) {
 		m.LogTerm = r.raftLog.term(pr.next - 1)
 		m.LogTerm = r.raftLog.term(pr.next - 1)
 		m.Entries = r.raftLog.entries(pr.next)
 		m.Entries = r.raftLog.entries(pr.next)
 		m.Commit = r.raftLog.committed
 		m.Commit = r.raftLog.committed
+		// optimistically increase the next if the follower
+		// has been matched.
+		if n := len(m.Entries); pr.match != 0 && n != 0 {
+			pr.optimisticUpdate(m.Entries[n-1].Index)
+		}
 	}
 	}
 	r.send(m)
 	r.send(m)
 }
 }
 
 
 // sendHeartbeat sends an empty MsgApp
 // sendHeartbeat sends an empty MsgApp
 func (r *raft) sendHeartbeat(to uint64) {
 func (r *raft) sendHeartbeat(to uint64) {
+	// Attach the commit as min(to.matched, r.committed).
+	// When the leader sends out heartbeat message,
+	// the receiver(follower) might not be matched with the leader
+	// or it might not have all the committed entries.
+	// The leader MUST NOT forward the follower's commit to
+	// an unmatched index.
+	commit := min(r.prs[to].match, r.raftLog.committed)
 	m := pb.Message{
 	m := pb.Message{
-		To:   to,
-		Type: pb.MsgApp,
+		To:     to,
+		Type:   pb.MsgApp,
+		Commit: commit,
 	}
 	}
 	r.send(m)
 	r.send(m)
 }
 }
@@ -397,6 +424,10 @@ func (r *raft) handleAppendEntries(m pb.Message) {
 	}
 	}
 }
 }
 
 
+func (r *raft) handleHeartbeat(m pb.Message) {
+	r.raftLog.commitTo(m.Commit)
+}
+
 func (r *raft) handleSnapshot(m pb.Message) {
 func (r *raft) handleSnapshot(m pb.Message) {
 	if r.restore(m.Snapshot) {
 	if r.restore(m.Snapshot) {
 		r.snapshot = &m.Snapshot
 		r.snapshot = &m.Snapshot
@@ -493,7 +524,11 @@ func stepFollower(r *raft, m pb.Message) {
 	case pb.MsgApp:
 	case pb.MsgApp:
 		r.elapsed = 0
 		r.elapsed = 0
 		r.lead = m.From
 		r.lead = m.From
-		r.handleAppendEntries(m)
+		if m.LogTerm == 0 && m.Index == 0 && len(m.Entries) == 0 {
+			r.handleHeartbeat(m)
+		} else {
+			r.handleAppendEntries(m)
+		}
 	case pb.MsgSnap:
 	case pb.MsgSnap:
 		r.elapsed = 0
 		r.elapsed = 0
 		r.handleSnapshot(m)
 		r.handleSnapshot(m)

+ 1 - 2
raft/raft_paper_test.go

@@ -597,11 +597,10 @@ func TestFollowerCheckMsgApp(t *testing.T) {
 		index   uint64
 		index   uint64
 		wreject bool
 		wreject bool
 	}{
 	}{
-		{0, 0, false},
 		{ents[0].Term, ents[0].Index, false},
 		{ents[0].Term, ents[0].Index, false},
-		{ents[1].Term, ents[1].Index, false},
 		{ents[0].Term, ents[0].Index + 1, true},
 		{ents[0].Term, ents[0].Index + 1, true},
 		{ents[0].Term + 1, ents[0].Index, true},
 		{ents[0].Term + 1, ents[0].Index, true},
+		{ents[1].Term, ents[1].Index, false},
 		{3, 3, true},
 		{3, 3, true},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {

+ 89 - 7
raft/raft_test.go

@@ -64,8 +64,18 @@ func TestProgressMaybeDecr(t *testing.T) {
 			1, 0, 0, false, 0,
 			1, 0, 0, false, 0,
 		},
 		},
 		{
 		{
-			// match != 0 is always false
-			5, 10, 9, false, 10,
+			// match != 0 and to is greater than match
+			// directly decrease to match+1
+			5, 10, 5, false, 10,
+		},
+		{
+			// match != 0 and to is greater than match
+			// directly decrease to match+1
+			5, 10, 4, false, 10,
+		},
+		{
+			// match != 0 and to is not greater than match
+			5, 10, 9, true, 6,
 		},
 		},
 		{
 		{
 			// next-1 != to is always false
 			// next-1 != to is always false
@@ -664,6 +674,37 @@ func TestHandleMsgApp(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestHandleHeartbeat ensures that the follower commits to the commit in the message.
+func TestHandleHeartbeat(t *testing.T) {
+	commit := uint64(2)
+	tests := []struct {
+		m       pb.Message
+		wCommit uint64
+	}{
+		{pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1},
+		{pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
+	}
+
+	for i, tt := range tests {
+		storage := NewMemoryStorage()
+		storage.Append([]pb.Entry{{Term: 1}, {Term: 2}, {Term: 3}})
+		sm := &raft{
+			state:     StateFollower,
+			HardState: pb.HardState{Term: 2},
+			raftLog:   newLog(storage),
+		}
+		sm.raftLog.commitTo(commit)
+		sm.handleHeartbeat(tt.m)
+		if sm.raftLog.committed != tt.wCommit {
+			t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
+		}
+		m := sm.readMessages()
+		if len(m) != 0 {
+			t.Fatalf("#%d: msg = nil, want 0", i)
+		}
+	}
+}
+
 func TestRecvMsgVote(t *testing.T) {
 func TestRecvMsgVote(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		state   StateType
 		state   StateType
@@ -836,7 +877,7 @@ func TestAllServerStepdown(t *testing.T) {
 }
 }
 
 
 func TestLeaderAppResp(t *testing.T) {
 func TestLeaderAppResp(t *testing.T) {
-	// initial progress: match = 0; netx = 3
+	// initial progress: match = 0; next = 3
 	tests := []struct {
 	tests := []struct {
 		index  uint64
 		index  uint64
 		reject bool
 		reject bool
@@ -850,7 +891,7 @@ func TestLeaderAppResp(t *testing.T) {
 	}{
 	}{
 		{3, true, 0, 3, 0, 0, 0},  // stale resp; no replies
 		{3, true, 0, 3, 0, 0, 0},  // stale resp; no replies
 		{2, true, 0, 2, 1, 1, 0},  // denied resp; leader does not commit; decrese next and send probing msg
 		{2, true, 0, 2, 1, 1, 0},  // denied resp; leader does not commit; decrese next and send probing msg
-		{2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
+		{2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
 		{0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
 		{0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
 	}
 	}
 
 
@@ -913,13 +954,20 @@ func TestBcastBeat(t *testing.T) {
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		sm.appendEntry(pb.Entry{})
 		sm.appendEntry(pb.Entry{})
 	}
 	}
+	// slow follower
+	sm.prs[2].match, sm.prs[2].next = 5, 6
+	// normal follower
+	sm.prs[3].match, sm.prs[3].next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
 
 
 	sm.Step(pb.Message{Type: pb.MsgBeat})
 	sm.Step(pb.Message{Type: pb.MsgBeat})
 	msgs := sm.readMessages()
 	msgs := sm.readMessages()
 	if len(msgs) != 2 {
 	if len(msgs) != 2 {
 		t.Fatalf("len(msgs) = %v, want 2", len(msgs))
 		t.Fatalf("len(msgs) = %v, want 2", len(msgs))
 	}
 	}
-	tomap := map[uint64]bool{2: true, 3: true}
+	wantCommitMap := map[uint64]uint64{
+		2: min(sm.raftLog.committed, sm.prs[2].match),
+		3: min(sm.raftLog.committed, sm.prs[3].match),
+	}
 	for i, m := range msgs {
 	for i, m := range msgs {
 		if m.Type != pb.MsgApp {
 		if m.Type != pb.MsgApp {
 			t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
 			t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
@@ -930,10 +978,13 @@ func TestBcastBeat(t *testing.T) {
 		if m.LogTerm != 0 {
 		if m.LogTerm != 0 {
 			t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
 			t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
 		}
 		}
-		if !tomap[m.To] {
+		if wantCommitMap[m.To] == 0 {
 			t.Fatalf("#%d: unexpected to %d", i, m.To)
 			t.Fatalf("#%d: unexpected to %d", i, m.To)
 		} else {
 		} else {
-			delete(tomap, m.To)
+			if m.Commit != wantCommitMap[m.To] {
+				t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To])
+			}
+			delete(wantCommitMap, m.To)
 		}
 		}
 		if len(m.Entries) != 0 {
 		if len(m.Entries) != 0 {
 			t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
 			t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
@@ -980,6 +1031,37 @@ func TestRecvMsgBeat(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestLeaderIncreaseNext(t *testing.T) {
+	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
+	tests := []struct {
+		// progress
+		match uint64
+		next  uint64
+
+		wnext uint64
+	}{
+		// match is not zero, optimistically increase next
+		// previous entries + noop entry + propose + 1
+		{1, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
+		// match is zero, not optimistically increase next
+		{0, 2, 2},
+	}
+
+	for i, tt := range tests {
+		sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
+		sm.raftLog.append(0, previousEnts...)
+		sm.becomeCandidate()
+		sm.becomeLeader()
+		sm.prs[2].match, sm.prs[2].next = tt.match, tt.next
+		sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+
+		p := sm.prs[2]
+		if p.next != tt.wnext {
+			t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
+		}
+	}
+}
+
 func TestRestore(t *testing.T) {
 func TestRestore(t *testing.T) {
 	s := pb.Snapshot{
 	s := pb.Snapshot{
 		Metadata: pb.SnapshotMetadata{
 		Metadata: pb.SnapshotMetadata{

+ 90 - 0
rafthttp/http.go

@@ -0,0 +1,90 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"io/ioutil"
+	"log"
+	"net/http"
+
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+type Processor interface {
+	Process(ctx context.Context, m raftpb.Message) error
+}
+
+func NewHandler(p Processor, cid types.ID) http.Handler {
+	return &handler{
+		p:   p,
+		cid: cid,
+	}
+}
+
+type handler struct {
+	p   Processor
+	cid types.ID
+}
+
+func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != "POST" {
+		w.Header().Set("Allow", "POST")
+		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		return
+	}
+
+	wcid := h.cid.String()
+	w.Header().Set("X-Etcd-Cluster-ID", wcid)
+
+	gcid := r.Header.Get("X-Etcd-Cluster-ID")
+	if gcid != wcid {
+		log.Printf("rafthttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
+		http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed)
+		return
+	}
+
+	b, err := ioutil.ReadAll(r.Body)
+	if err != nil {
+		log.Println("rafthttp: error reading raft message:", err)
+		http.Error(w, "error reading raft message", http.StatusBadRequest)
+		return
+	}
+	var m raftpb.Message
+	if err := m.Unmarshal(b); err != nil {
+		log.Println("rafthttp: error unmarshaling raft message:", err)
+		http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
+		return
+	}
+	if err := h.p.Process(context.TODO(), m); err != nil {
+		switch v := err.(type) {
+		case writerToResponse:
+			v.WriteTo(w)
+		default:
+			log.Printf("rafthttp: error processing raft message: %v", err)
+			http.Error(w, "error processing raft message", http.StatusInternalServerError)
+		}
+		return
+	}
+	w.WriteHeader(http.StatusNoContent)
+}
+
+type writerToResponse interface {
+	WriteTo(w http.ResponseWriter)
+}

+ 180 - 0
rafthttp/http_test.go

@@ -0,0 +1,180 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"bytes"
+	"errors"
+	"io"
+	"net/http"
+	"net/http/httptest"
+	"strings"
+	"testing"
+
+	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+func TestServeRaft(t *testing.T) {
+	testCases := []struct {
+		method    string
+		body      io.Reader
+		p         Processor
+		clusterID string
+
+		wcode int
+	}{
+		{
+			// bad method
+			"GET",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&nopProcessor{},
+			"0",
+			http.StatusMethodNotAllowed,
+		},
+		{
+			// bad method
+			"PUT",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&nopProcessor{},
+			"0",
+			http.StatusMethodNotAllowed,
+		},
+		{
+			// bad method
+			"DELETE",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&nopProcessor{},
+			"0",
+			http.StatusMethodNotAllowed,
+		},
+		{
+			// bad request body
+			"POST",
+			&errReader{},
+			&nopProcessor{},
+			"0",
+			http.StatusBadRequest,
+		},
+		{
+			// bad request protobuf
+			"POST",
+			strings.NewReader("malformed garbage"),
+			&nopProcessor{},
+			"0",
+			http.StatusBadRequest,
+		},
+		{
+			// good request, wrong cluster ID
+			"POST",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&nopProcessor{},
+			"1",
+			http.StatusPreconditionFailed,
+		},
+		{
+			// good request, Processor failure
+			"POST",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&errProcessor{
+				err: &resWriterToError{code: http.StatusForbidden},
+			},
+			"0",
+			http.StatusForbidden,
+		},
+		{
+			// good request, Processor failure
+			"POST",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&errProcessor{
+				err: &resWriterToError{code: http.StatusInternalServerError},
+			},
+			"0",
+			http.StatusInternalServerError,
+		},
+		{
+			// good request, Processor failure
+			"POST",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&errProcessor{err: errors.New("blah")},
+			"0",
+			http.StatusInternalServerError,
+		},
+		{
+			// good request
+			"POST",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&nopProcessor{},
+			"0",
+			http.StatusNoContent,
+		},
+	}
+	for i, tt := range testCases {
+		req, err := http.NewRequest(tt.method, "foo", tt.body)
+		if err != nil {
+			t.Fatalf("#%d: could not create request: %#v", i, err)
+		}
+		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
+		rw := httptest.NewRecorder()
+		h := NewHandler(tt.p, types.ID(0))
+		h.ServeHTTP(rw, req)
+		if rw.Code != tt.wcode {
+			t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
+		}
+	}
+}
+
+// errReader implements io.Reader to facilitate a broken request.
+type errReader struct{}
+
+func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") }
+
+type nopProcessor struct{}
+
+func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil }
+
+type errProcessor struct {
+	err error
+}
+
+func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err }
+
+type resWriterToError struct {
+	code int
+}
+
+func (e *resWriterToError) Error() string                 { return "" }
+func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) }

+ 32 - 111
etcdserver/sender.go → rafthttp/sender.go

@@ -14,124 +14,51 @@
    limitations under the License.
    limitations under the License.
 */
 */
 
 
-package etcdserver
+package rafthttp
 
 
 import (
 import (
 	"bytes"
 	"bytes"
 	"fmt"
 	"fmt"
 	"log"
 	"log"
 	"net/http"
 	"net/http"
-	"net/url"
-	"path"
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 )
 
 
 const (
 const (
-	raftPrefix    = "/raft"
 	connPerSender = 4
 	connPerSender = 4
 	senderBufSize = connPerSender * 4
 	senderBufSize = connPerSender * 4
 )
 )
 
 
-type sendHub struct {
-	tr         http.RoundTripper
-	cl         ClusterInfo
-	ss         *stats.ServerStats
-	ls         *stats.LeaderStats
-	senders    map[types.ID]*sender
-	shouldstop chan struct{}
-}
-
-// newSendHub creates the default send hub used to transport raft messages
-// to other members. The returned sendHub will update the given ServerStats and
-// LeaderStats appropriately.
-func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
-	h := &sendHub{
-		tr:         t,
-		cl:         cl,
-		ss:         ss,
-		ls:         ls,
-		senders:    make(map[types.ID]*sender),
-		shouldstop: make(chan struct{}, 1),
-	}
-	for _, m := range cl.Members() {
-		h.Add(m)
-	}
-	return h
-}
-
-func (h *sendHub) Send(msgs []raftpb.Message) {
-	for _, m := range msgs {
-		to := types.ID(m.To)
-		s, ok := h.senders[to]
-		if !ok {
-			if !h.cl.IsIDRemoved(to) {
-				log.Printf("etcdserver: send message to unknown receiver %s", to)
-			}
-			continue
-		}
-
-		// TODO: don't block. we should be able to have 1000s
-		// of messages out at a time.
-		data, err := m.Marshal()
-		if err != nil {
-			log.Println("sender: dropping message:", err)
-			return // drop bad message
-		}
-		if m.Type == raftpb.MsgApp {
-			h.ss.SendAppendReq(len(data))
-		}
-
-		// TODO (xiangli): reasonable retry logic
-		s.send(data)
-	}
-}
-
-func (h *sendHub) Stop() {
-	for _, s := range h.senders {
-		s.stop()
-	}
-}
-
-func (h *sendHub) ShouldStopNotify() <-chan struct{} {
-	return h.shouldstop
+type Sender interface {
+	Update(u string)
+	// Send sends the data to the remote node. It is always non-blocking.
+	// It may be fail to send data if it returns nil error.
+	Send(m raftpb.Message) error
+	// Stop performs any necessary finalization and terminates the Sender
+	// elegantly.
+	Stop()
 }
 }
 
 
-func (h *sendHub) Add(m *Member) {
-	if _, ok := h.senders[m.ID]; ok {
-		return
-	}
-	// TODO: considering how to switch between all available peer urls
-	u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix)
-	fs := h.ls.Follower(m.ID.String())
-	s := newSender(h.tr, u, h.cl.ID(), fs, h.shouldstop)
-	h.senders[m.ID] = s
-}
-
-func (h *sendHub) Remove(id types.ID) {
-	h.senders[id].stop()
-	delete(h.senders, id)
-}
-
-func (h *sendHub) Update(m *Member) {
-	// TODO: return error or just panic?
-	if _, ok := h.senders[m.ID]; !ok {
-		return
+func NewSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
+	s := &sender{
+		tr:         tr,
+		u:          u,
+		cid:        cid,
+		fs:         fs,
+		q:          make(chan []byte, senderBufSize),
+		shouldstop: shouldstop,
 	}
 	}
-	peerURL := m.PickPeerURL()
-	u, err := url.Parse(peerURL)
-	if err != nil {
-		log.Panicf("unexpect peer url %s", peerURL)
+	s.wg.Add(connPerSender)
+	for i := 0; i < connPerSender; i++ {
+		go s.handle()
 	}
 	}
-	u.Path = path.Join(u.Path, raftPrefix)
-	s := h.senders[m.ID]
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	s.u = u.String()
+	return s
 }
 }
 
 
 type sender struct {
 type sender struct {
@@ -145,23 +72,17 @@ type sender struct {
 	shouldstop chan struct{}
 	shouldstop chan struct{}
 }
 }
 
 
-func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
-	s := &sender{
-		tr:         tr,
-		u:          u,
-		cid:        cid,
-		fs:         fs,
-		q:          make(chan []byte, senderBufSize),
-		shouldstop: shouldstop,
-	}
-	s.wg.Add(connPerSender)
-	for i := 0; i < connPerSender; i++ {
-		go s.handle()
-	}
-	return s
+func (s *sender) Update(u string) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.u = u
 }
 }
 
 
-func (s *sender) send(data []byte) error {
+// TODO (xiangli): reasonable retry logic
+func (s *sender) Send(m raftpb.Message) error {
+	// TODO: don't block. we should be able to have 1000s
+	// of messages out at a time.
+	data := pbutil.MustMarshal(&m)
 	select {
 	select {
 	case s.q <- data:
 	case s.q <- data:
 		return nil
 		return nil
@@ -171,7 +92,7 @@ func (s *sender) send(data []byte) error {
 	}
 	}
 }
 }
 
 
-func (s *sender) stop() {
+func (s *sender) Stop() {
 	close(s.q)
 	close(s.q)
 	s.wg.Wait()
 	s.wg.Wait()
 }
 }

+ 20 - 105
etcdserver/sender_test.go → rafthttp/sender_test.go

@@ -14,7 +14,7 @@
    limitations under the License.
    limitations under the License.
 */
 */
 
 
-package etcdserver
+package rafthttp
 
 
 import (
 import (
 	"errors"
 	"errors"
@@ -22,109 +22,24 @@ import (
 	"net/http"
 	"net/http"
 	"sync"
 	"sync"
 	"testing"
 	"testing"
-	"time"
 
 
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
 )
 )
 
 
-func TestSendHubInitSenders(t *testing.T) {
-	membs := []*Member{
-		newTestMember(1, []string{"http://a"}, "", nil),
-		newTestMember(2, []string{"http://b"}, "", nil),
-		newTestMember(3, []string{"http://c"}, "", nil),
-	}
-	cl := newTestCluster(membs)
-	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, ls)
-
-	ids := cl.MemberIDs()
-	if len(h.senders) != len(ids) {
-		t.Errorf("len(ids) = %d, want %d", len(h.senders), len(ids))
-	}
-	for _, id := range ids {
-		if _, ok := h.senders[id]; !ok {
-			t.Errorf("senders[%s] is nil, want exists", id)
-		}
-	}
-}
-
-func TestSendHubAdd(t *testing.T) {
-	cl := newTestCluster(nil)
-	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, ls)
-	m := newTestMember(1, []string{"http://a"}, "", nil)
-	h.Add(m)
-
-	if _, ok := ls.Followers["1"]; !ok {
-		t.Errorf("FollowerStats[1] is nil, want exists")
-	}
-	s, ok := h.senders[types.ID(1)]
-	if !ok {
-		t.Fatalf("senders[1] is nil, want exists")
-	}
-	if s.u != "http://a/raft" {
-		t.Errorf("url = %s, want %s", s.u, "http://a/raft")
-	}
-
-	h.Add(m)
-	ns := h.senders[types.ID(1)]
-	if s != ns {
-		t.Errorf("sender = %p, want %p", ns, s)
-	}
-}
-
-func TestSendHubRemove(t *testing.T) {
-	membs := []*Member{
-		newTestMember(1, []string{"http://a"}, "", nil),
-	}
-	cl := newTestCluster(membs)
-	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, ls)
-	h.Remove(types.ID(1))
-
-	if _, ok := h.senders[types.ID(1)]; ok {
-		t.Fatalf("senders[1] exists, want removed")
-	}
-}
-
-func TestSendHubShouldStop(t *testing.T) {
-	membs := []*Member{
-		newTestMember(1, []string{"http://a"}, "", nil),
-	}
-	tr := newRespRoundTripper(http.StatusForbidden, nil)
-	cl := newTestCluster(membs)
-	ls := stats.NewLeaderStats("")
-	h := newSendHub(tr, cl, nil, ls)
-
-	shouldstop := h.ShouldStopNotify()
-	select {
-	case <-shouldstop:
-		t.Fatalf("received unexpected shouldstop notification")
-	case <-time.After(10 * time.Millisecond):
-	}
-	h.senders[1].send([]byte("somedata"))
-
-	testutil.ForceGosched()
-	select {
-	case <-shouldstop:
-	default:
-		t.Fatalf("cannot receive stop notification")
-	}
-}
-
 // TestSenderSend tests that send func could post data using roundtripper
 // TestSenderSend tests that send func could post data using roundtripper
 // and increase success count in stats.
 // and increase success count in stats.
 func TestSenderSend(t *testing.T) {
 func TestSenderSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	tr := &roundTripperRecorder{}
 	fs := &stats.FollowerStats{}
 	fs := &stats.FollowerStats{}
-	s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
 
 
-	if err := s.send([]byte("some data")); err != nil {
+	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Fatalf("unexpect send error: %v", err)
 		t.Fatalf("unexpect send error: %v", err)
 	}
 	}
-	s.stop()
+	s.Stop()
 
 
 	if tr.Request() == nil {
 	if tr.Request() == nil {
 		t.Errorf("sender fails to post the data")
 		t.Errorf("sender fails to post the data")
@@ -139,12 +54,12 @@ func TestSenderSend(t *testing.T) {
 func TestSenderExceedMaximalServing(t *testing.T) {
 func TestSenderExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	tr := newRoundTripperBlocker()
 	fs := &stats.FollowerStats{}
 	fs := &stats.FollowerStats{}
-	s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
 
 
 	// keep the sender busy and make the buffer full
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
 	// nothing can go out as we block the sender
 	for i := 0; i < connPerSender+senderBufSize; i++ {
 	for i := 0; i < connPerSender+senderBufSize; i++ {
-		if err := s.send([]byte("some data")); err != nil {
+		if err := s.Send(raftpb.Message{}); err != nil {
 			t.Errorf("send err = %v, want nil", err)
 			t.Errorf("send err = %v, want nil", err)
 		}
 		}
 		// force the sender to grab data
 		// force the sender to grab data
@@ -152,7 +67,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 	}
 	}
 
 
 	// try to send a data when we are sure the buffer is full
 	// try to send a data when we are sure the buffer is full
-	if err := s.send([]byte("some data")); err == nil {
+	if err := s.Send(raftpb.Message{}); err == nil {
 		t.Errorf("unexpect send success")
 		t.Errorf("unexpect send success")
 	}
 	}
 
 
@@ -161,22 +76,22 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 	testutil.ForceGosched()
 	testutil.ForceGosched()
 
 
 	// It could send new data after previous ones succeed
 	// It could send new data after previous ones succeed
-	if err := s.send([]byte("some data")); err != nil {
+	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Errorf("send err = %v, want nil", err)
 		t.Errorf("send err = %v, want nil", err)
 	}
 	}
-	s.stop()
+	s.Stop()
 }
 }
 
 
 // TestSenderSendFailed tests that when send func meets the post error,
 // TestSenderSendFailed tests that when send func meets the post error,
 // it increases fail count in stats.
 // it increases fail count in stats.
 func TestSenderSendFailed(t *testing.T) {
 func TestSenderSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
 	fs := &stats.FollowerStats{}
-	s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
+	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
 
 
-	if err := s.send([]byte("some data")); err != nil {
-		t.Fatalf("unexpect send error: %v", err)
+	if err := s.Send(raftpb.Message{}); err != nil {
+		t.Fatalf("unexpect Send error: %v", err)
 	}
 	}
-	s.stop()
+	s.Stop()
 
 
 	fs.Lock()
 	fs.Lock()
 	defer fs.Unlock()
 	defer fs.Unlock()
@@ -187,11 +102,11 @@ func TestSenderSendFailed(t *testing.T) {
 
 
 func TestSenderPost(t *testing.T) {
 func TestSenderPost(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	tr := &roundTripperRecorder{}
-	s := newSender(tr, "http://10.0.0.1", types.ID(1), nil, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), nil, nil)
 	if err := s.post([]byte("some data")); err != nil {
 	if err := s.post([]byte("some data")); err != nil {
 		t.Fatalf("unexpect post error: %v", err)
 		t.Fatalf("unexpect post error: %v", err)
 	}
 	}
-	s.stop()
+	s.Stop()
 
 
 	if g := tr.Request().Method; g != "POST" {
 	if g := tr.Request().Method; g != "POST" {
 		t.Errorf("method = %s, want %s", g, "POST")
 		t.Errorf("method = %s, want %s", g, "POST")
@@ -230,9 +145,9 @@ func TestSenderPostBad(t *testing.T) {
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		shouldstop := make(chan struct{})
 		shouldstop := make(chan struct{})
-		s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
+		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
 		err := s.post([]byte("some data"))
 		err := s.post([]byte("some data"))
-		s.stop()
+		s.Stop()
 
 
 		if err == nil {
 		if err == nil {
 			t.Errorf("#%d: err = nil, want not nil", i)
 			t.Errorf("#%d: err = nil, want not nil", i)
@@ -251,9 +166,9 @@ func TestSenderPostShouldStop(t *testing.T) {
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		shouldstop := make(chan struct{}, 1)
 		shouldstop := make(chan struct{}, 1)
-		s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
+		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
 		s.post([]byte("some data"))
 		s.post([]byte("some data"))
-		s.stop()
+		s.Stop()
 		select {
 		select {
 		case <-shouldstop:
 		case <-shouldstop:
 		default:
 		default:

+ 2 - 2
scripts/build-docker

@@ -6,7 +6,7 @@ FROM scratch
 ADD etcd /
 ADD etcd /
 ADD etcdctl /
 ADD etcdctl /
 EXPOSE 4001 7001 2379 2380
 EXPOSE 4001 7001 2379 2380
-CMD ["/etcd"]
+ENTRYPOINT ["/etcd"]
 DF
 DF
 
 
-docker build .
+docker build -t quay.io/coreos/etcd:${1} .

+ 1 - 1
scripts/build-release

@@ -35,7 +35,7 @@ function package {
 	if [ -d ${ccdir} ]; then
 	if [ -d ${ccdir} ]; then
 		srcdir=${ccdir}
 		srcdir=${ccdir}
 	fi
 	fi
-	for bin in etcd etcdctl; do
+	for bin in etcd etcdctl etcd-migrate; do
 		cp ${srcdir}/${bin} ${target}
 		cp ${srcdir}/${bin} ${target}
 	done
 	done
 
 

+ 1 - 1
test

@@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
 source ./build
 source ./build
 
 
 # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
 # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
-TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/types pkg/transport pkg/wait proxy raft snap store wal"
+TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
 FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/"
 FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/"
 
 
 # user has not provided PKG override
 # user has not provided PKG override