Browse Source

Merge remote-tracking branch 'coreos/master' into merge

* coreos/master:
  rafthttp: fix import
  raft: should not decrease match and next when handling out of order msgAppResp
  Fix migration to allow snapshots to have the right IDs
  add snapshotted integration test
  fix test import loop
  fix import loop, add set to types, and fix comments
  etcdserver: autodetect v0.4 WALs and upgrade them to v0.5 automatically
  wal: add a bench for write entry
  rafthttp: add streaming server and client
  dep: use vendored imports in codegangsta/cli
  dep: bump golang.org/x/net/context

Conflicts:
	etcdserver/server.go
	etcdserver/server_test.go
	migrate/snapshot.go
Ben Darnell 11 years ago
parent
commit
0d680d0e6b
51 changed files with 1284 additions and 132 deletions
  1. 5 5
      Godeps/Godeps.json
  2. 1 1
      Godeps/_workspace/src/github.com/codegangsta/cli/app_test.go
  3. 1 1
      Godeps/_workspace/src/github.com/codegangsta/cli/cli_test.go
  4. 1 1
      Godeps/_workspace/src/github.com/codegangsta/cli/command_test.go
  5. 1 1
      Godeps/_workspace/src/github.com/codegangsta/cli/context_test.go
  6. 1 1
      Godeps/_workspace/src/github.com/codegangsta/cli/flag_test.go
  7. 13 12
      Godeps/_workspace/src/golang.org/x/net/context/context.go
  8. 1 1
      Godeps/_workspace/src/golang.org/x/net/context/context_test.go
  9. 1 1
      Godeps/_workspace/src/golang.org/x/net/context/withtimeout_test.go
  10. 1 1
      client/http.go
  11. 1 1
      client/http_test.go
  12. 1 1
      client/keys.go
  13. 1 1
      client/members.go
  14. 1 1
      discovery/discovery.go
  15. 1 1
      discovery/discovery_test.go
  16. 1 1
      etcdctl/command/member_commands.go
  17. 1 1
      etcdserver/etcdhttp/client.go
  18. 1 1
      etcdserver/etcdhttp/client_test.go
  19. 1 1
      etcdserver/etcdhttp/http_test.go
  20. 3 2
      etcdserver/etcdhttp/peer.go
  21. 6 2
      etcdserver/sendhub.go
  22. 4 4
      etcdserver/sendhub_test.go
  23. 47 16
      etcdserver/server.go
  24. 23 20
      etcdserver/server_test.go
  25. 22 19
      integration/cluster_test.go
  26. 34 0
      integration/migration_test.go
  27. 1 0
      integration/testdata/integration046_data/conf
  28. BIN
      integration/testdata/integration046_data/log
  29. 1 0
      integration/testdata/integration046_data/snapshot/1_90.ss
  30. 3 2
      migrate/etcd4.go
  31. 4 5
      migrate/log.go
  32. 1 3
      migrate/log_test.go
  33. 59 0
      migrate/member.go
  34. 9 7
      migrate/snapshot.go
  35. 180 0
      pkg/types/set.go
  36. 166 0
      pkg/types/set_test.go
  37. 1 1
      raft/node.go
  38. 1 1
      raft/node_bench_test.go
  39. 1 1
      raft/node_test.go
  40. 6 2
      raft/raft.go
  41. 28 0
      raft/raft_test.go
  42. 54 0
      rafthttp/entry_reader.go
  43. 63 0
      rafthttp/entry_test.go
  44. 54 0
      rafthttp/entry_writer.go
  45. 86 1
      rafthttp/http.go
  46. 1 1
      rafthttp/http_test.go
  47. 92 5
      rafthttp/sender.go
  48. 6 6
      rafthttp/sender_test.go
  49. 207 0
      rafthttp/streamer.go
  50. 33 0
      wal/util.go
  51. 53 0
      wal/wal_bench_test.go

+ 5 - 5
Godeps/Godeps.json

@@ -5,11 +5,6 @@
 		"./..."
 	],
 	"Deps": [
-		{
-			"ImportPath": "code.google.com/p/go.net/context",
-			"Comment": "null-144",
-			"Rev": "ad01a6fcc8a19d3a4478c836895ffe883bd2ceab"
-		},
 		{
 			"ImportPath": "code.google.com/p/gogoprotobuf/proto",
 			"Rev": "7fd1620f09261338b6b1ca1289ace83aee0ec946"
@@ -31,6 +26,11 @@
 		{
 			"ImportPath": "github.com/stretchr/testify/assert",
 			"Rev": "9cc77fa25329013ce07362c7742952ff887361f2"
+		},
+		{
+			"ImportPath": "golang.org/x/net/context",
+			"Comment": "null-220",
+			"Rev": "c5a46024776ec35eb562fa9226968b9d543bb13a"
 		}
 	]
 }

+ 1 - 1
Godeps/_workspace/src/github.com/codegangsta/cli/app_test.go

@@ -5,7 +5,7 @@ import (
 	"os"
 	"testing"
 
-	"github.com/codegangsta/cli"
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
 )
 
 func ExampleApp() {

+ 1 - 1
Godeps/_workspace/src/github.com/codegangsta/cli/cli_test.go

@@ -3,7 +3,7 @@ package cli_test
 import (
 	"os"
 
-	"github.com/codegangsta/cli"
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
 )
 
 func Example() {

+ 1 - 1
Godeps/_workspace/src/github.com/codegangsta/cli/command_test.go

@@ -4,7 +4,7 @@ import (
 	"flag"
 	"testing"
 
-	"github.com/codegangsta/cli"
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
 )
 
 func TestCommandDoNotIgnoreFlags(t *testing.T) {

+ 1 - 1
Godeps/_workspace/src/github.com/codegangsta/cli/context_test.go

@@ -5,7 +5,7 @@ import (
 	"testing"
 	"time"
 
-	"github.com/codegangsta/cli"
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
 )
 
 func TestNewContext(t *testing.T) {

+ 1 - 1
Godeps/_workspace/src/github.com/codegangsta/cli/flag_test.go

@@ -7,7 +7,7 @@ import (
 	"strings"
 	"testing"
 
-	"github.com/codegangsta/cli"
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
 )
 
 var boolFlagTests = []struct {

+ 13 - 12
Godeps/_workspace/src/code.google.com/p/go.net/context/context.go → Godeps/_workspace/src/golang.org/x/net/context/context.go

@@ -108,7 +108,7 @@ type Context interface {
 	// 	// Package user defines a User type that's stored in Contexts.
 	// 	package user
 	//
-	// 	import "code.google.com/p/go.net/context"
+	// 	import "golang.org/x/net/context"
 	//
 	// 	// User is the type of value stored in the Contexts.
 	// 	type User struct {...}
@@ -124,7 +124,7 @@ type Context interface {
 	//
 	// 	// NewContext returns a new Context that carries value u.
 	// 	func NewContext(ctx context.Context, u *User) context.Context {
-	// 		return context.WithValue(userKey, u)
+	// 		return context.WithValue(ctx, userKey, u)
 	// 	}
 	//
 	// 	// FromContext returns the User value stored in ctx, if any.
@@ -142,27 +142,28 @@ var Canceled = errors.New("context canceled")
 // deadline passes.
 var DeadlineExceeded = errors.New("context deadline exceeded")
 
-// An emptyCtx is never canceled, has no values, and has no deadline.
+// An emptyCtx is never canceled, has no values, and has no deadline.  It is not
+// struct{}, since vars of this type must have distinct addresses.
 type emptyCtx int
 
-func (emptyCtx) Deadline() (deadline time.Time, ok bool) {
+func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
 	return
 }
 
-func (emptyCtx) Done() <-chan struct{} {
+func (*emptyCtx) Done() <-chan struct{} {
 	return nil
 }
 
-func (emptyCtx) Err() error {
+func (*emptyCtx) Err() error {
 	return nil
 }
 
-func (emptyCtx) Value(key interface{}) interface{} {
+func (*emptyCtx) Value(key interface{}) interface{} {
 	return nil
 }
 
-func (n emptyCtx) String() string {
-	switch n {
+func (e *emptyCtx) String() string {
+	switch e {
 	case background:
 		return "context.Background"
 	case todo:
@@ -171,9 +172,9 @@ func (n emptyCtx) String() string {
 	return "unknown empty Context"
 }
 
-const (
-	background emptyCtx = 1
-	todo       emptyCtx = 2
+var (
+	background = new(emptyCtx)
+	todo       = new(emptyCtx)
 )
 
 // Background returns a non-nil, empty Context. It is never canceled, has no

+ 1 - 1
Godeps/_workspace/src/code.google.com/p/go.net/context/context_test.go → Godeps/_workspace/src/golang.org/x/net/context/context_test.go

@@ -365,7 +365,7 @@ func TestAllocs(t *testing.T) {
 				c := WithValue(bg, k1, nil)
 				c.Value(k1)
 			},
-			limit:      1,
+			limit:      3,
 			gccgoLimit: 3,
 		},
 		{

+ 1 - 1
Godeps/_workspace/src/code.google.com/p/go.net/context/withtimeout_test.go → Godeps/_workspace/src/golang.org/x/net/context/withtimeout_test.go

@@ -8,7 +8,7 @@ import (
 	"fmt"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
 func ExampleWithTimeout() {

+ 1 - 1
client/http.go

@@ -24,7 +24,7 @@ import (
 	"net/url"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
 var (

+ 1 - 1
client/http_test.go

@@ -26,7 +26,7 @@ import (
 	"testing"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
 type staticHTTPClient struct {

+ 1 - 1
client/keys.go

@@ -27,7 +27,7 @@ import (
 	"strings"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
 var (

+ 1 - 1
client/members.go

@@ -24,7 +24,7 @@ import (
 	"net/url"
 	"path"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
 	"github.com/coreos/etcd/pkg/types"
 )

+ 1 - 1
discovery/discovery.go

@@ -28,8 +28,8 @@ import (
 	"strings"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/pkg/types"
 )

+ 1 - 1
discovery/discovery_test.go

@@ -26,8 +26,8 @@ import (
 	"testing"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/client"
 )
 

+ 1 - 1
etcdctl/command/member_commands.go

@@ -21,8 +21,8 @@ import (
 	"os"
 	"strings"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/client"
 )
 

+ 1 - 1
etcdserver/etcdhttp/client.go

@@ -29,8 +29,8 @@ import (
 	"strings"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"

+ 1 - 1
etcdserver/etcdhttp/client_test.go

@@ -31,8 +31,8 @@ import (
 	"testing"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"

+ 1 - 1
etcdserver/etcdhttp/http_test.go

@@ -24,7 +24,7 @@ import (
 	"sort"
 	"testing"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/etcdserverpb"

+ 3 - 2
etcdserver/etcdhttp/peer.go

@@ -26,20 +26,21 @@ import (
 )
 
 const (
-	raftPrefix        = "/raft"
 	peerMembersPrefix = "/members"
 )
 
 // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
 func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
 	rh := rafthttp.NewHandler(server, server.Cluster.ID())
+	rsh := rafthttp.NewStreamHandler(server.SenderFinder(), server.ID(), server.Cluster.ID())
 	mh := &peerMembersHandler{
 		clusterInfo: server.Cluster,
 	}
 
 	mux := http.NewServeMux()
 	mux.HandleFunc("/", http.NotFound)
-	mux.Handle(raftPrefix, rh)
+	mux.Handle(rafthttp.RaftPrefix, rh)
+	mux.Handle(rafthttp.RaftStreamPrefix+"/", rsh)
 	mux.Handle(peerMembersPrefix, mh)
 	return mux
 }

+ 6 - 2
etcdserver/sendhub.go

@@ -35,6 +35,7 @@ const (
 type sendHub struct {
 	tr         http.RoundTripper
 	cl         ClusterInfo
+	p          rafthttp.Processor
 	ss         *stats.ServerStats
 	ls         *stats.LeaderStats
 	senders    map[types.ID]rafthttp.Sender
@@ -44,10 +45,11 @@ type sendHub struct {
 // newSendHub creates the default send hub used to transport raft messages
 // to other members. The returned sendHub will update the given ServerStats and
 // LeaderStats appropriately.
-func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
+func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
 	h := &sendHub{
 		tr:         t,
 		cl:         cl,
+		p:          p,
 		ss:         ss,
 		ls:         ls,
 		senders:    make(map[types.ID]rafthttp.Sender),
@@ -59,6 +61,8 @@ func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *
 	return h
 }
 
+func (h *sendHub) Sender(id types.ID) rafthttp.Sender { return h.senders[id] }
+
 func (h *sendHub) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
 		to := types.ID(m.To)
@@ -100,7 +104,7 @@ func (h *sendHub) Add(m *Member) {
 	}
 	u.Path = path.Join(u.Path, raftPrefix)
 	fs := h.ls.Follower(m.ID.String())
-	s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), fs, h.shouldstop)
+	s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), h.p, fs, h.shouldstop)
 	h.senders[m.ID] = s
 }
 

+ 4 - 4
etcdserver/sendhub_test.go

@@ -35,7 +35,7 @@ func TestSendHubInitSenders(t *testing.T) {
 	}
 	cl := newTestCluster(membs)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, ls)
+	h := newSendHub(nil, cl, nil, nil, ls)
 
 	ids := cl.MemberIDs()
 	if len(h.senders) != len(ids) {
@@ -51,7 +51,7 @@ func TestSendHubInitSenders(t *testing.T) {
 func TestSendHubAdd(t *testing.T) {
 	cl := newTestCluster(nil)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, ls)
+	h := newSendHub(nil, cl, nil, nil, ls)
 	m := newTestMember(1, []string{"http://a"}, "", nil)
 	h.Add(m)
 
@@ -76,7 +76,7 @@ func TestSendHubRemove(t *testing.T) {
 	}
 	cl := newTestCluster(membs)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(nil, cl, nil, ls)
+	h := newSendHub(nil, cl, nil, nil, ls)
 	h.Remove(types.ID(1))
 
 	if _, ok := h.senders[types.ID(1)]; ok {
@@ -91,7 +91,7 @@ func TestSendHubShouldStop(t *testing.T) {
 	tr := newRespRoundTripper(http.StatusForbidden, nil)
 	cl := newTestCluster(membs)
 	ls := stats.NewLeaderStats("")
-	h := newSendHub(tr, cl, nil, ls)
+	h := newSendHub(tr, cl, nil, nil, ls)
 
 	shouldstop := h.ShouldStopNotify()
 	select {

+ 47 - 16
etcdserver/server.go

@@ -31,16 +31,18 @@ import (
 	"sync/atomic"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/migrate"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/wait"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/wal"
@@ -85,7 +87,8 @@ type Response struct {
 	err     error
 }
 
-type Sender interface {
+type SendHub interface {
+	rafthttp.SenderFinder
 	Send(m []raftpb.Message)
 	Add(m *Member)
 	Remove(id types.ID)
@@ -173,7 +176,7 @@ type EtcdServer struct {
 	// MUST NOT block. It is okay to drop messages, since clients should
 	// timeout and reissue their messages.  If send is nil, server will
 	// panic.
-	sender Sender
+	sendhub SendHub
 
 	storage Storage
 
@@ -189,19 +192,46 @@ type EtcdServer struct {
 	raftLead uint64
 }
 
+// UpgradeWAL converts an older version of the EtcdServer data to the newest version.
+// It must ensure that, after upgrading, the most recent version is present.
+func UpgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
+	if ver == wal.WALv0_4 {
+		log.Print("Converting v0.4 log to v0.5")
+		err := migrate.Migrate4To5(cfg.DataDir, cfg.Name)
+		if err != nil {
+			log.Fatalf("Failed migrating data-dir: %v", err)
+			return err
+		}
+	}
+	return nil
+}
+
 // NewServer creates a new EtcdServer from the supplied configuration. The
 // configuration is considered static for the lifetime of the EtcdServer.
 func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
-	if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
-		return nil, fmt.Errorf("cannot create snapshot directory: %v", err)
-	}
-	ss := snap.New(cfg.SnapDir())
 	st := store.New()
 	var w *wal.WAL
 	var n raft.Node
 	var s *raft.MemoryStorage
 	var id types.ID
-	haveWAL := wal.Exist(cfg.WALDir())
+	walVersion := wal.DetectVersion(cfg.DataDir)
+	if walVersion == wal.WALUnknown {
+		return nil, fmt.Errorf("unknown wal version in data dir %s", cfg.DataDir)
+	}
+	haveWAL := walVersion != wal.WALNotExist
+
+	if haveWAL && walVersion != wal.WALv0_5 {
+		err := UpgradeWAL(cfg, walVersion)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
+		return nil, fmt.Errorf("cannot create snapshot directory: %v", err)
+	}
+	ss := snap.New(cfg.SnapDir())
+
 	switch {
 	case !haveWAL && !cfg.NewCluster:
 		us := getOtherPeerURLs(cfg.Cluster, cfg.Name)
@@ -272,7 +302,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	}
 	lstats := stats.NewLeaderStats(id.String())
 
-	shub := newSendHub(cfg.Transport, cfg.Cluster, sstats, lstats)
 	srv := &EtcdServer{
 		store:       st,
 		node:        n,
@@ -286,11 +315,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		}{w, ss},
 		stats:      sstats,
 		lstats:     lstats,
-		sender:     shub,
 		Ticker:     time.Tick(100 * time.Millisecond),
 		SyncTicker: time.Tick(500 * time.Millisecond),
 		snapCount:  cfg.SnapCount,
 	}
+	srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats)
 	return srv, nil
 }
 
@@ -321,6 +350,8 @@ func (s *EtcdServer) start() {
 
 func (s *EtcdServer) ID() types.ID { return s.id }
 
+func (s *EtcdServer) SenderFinder() rafthttp.SenderFinder { return s.sendhub }
+
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 	if s.Cluster.IsIDRemoved(types.ID(m.From)) {
 		log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String())
@@ -338,11 +369,11 @@ func (s *EtcdServer) run() {
 	var snapi, appliedi uint64
 	var nodes []uint64
 	var shouldstop bool
-	shouldstopC := s.sender.ShouldStopNotify()
+	shouldstopC := s.sendhub.ShouldStopNotify()
 
 	defer func() {
 		s.node.Stop()
-		s.sender.Stop()
+		s.sendhub.Stop()
 		close(s.done)
 	}()
 	for {
@@ -369,7 +400,7 @@ func (s *EtcdServer) run() {
 					log.Fatalf("etcdserver: create snapshot error: %v", err)
 				}
 			}
-			s.sender.Send(rd.Messages)
+			s.sendhub.Send(rd.Messages)
 
 			if !raft.IsEmptySnap(rd.Snapshot) {
 				// recover from snapshot if it is more updated than current applied
@@ -733,7 +764,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
 		if m.ID == s.id {
 			log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		} else {
-			s.sender.Add(m)
+			s.sendhub.Add(m)
 			log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 	case raftpb.ConfChangeRemoveNode:
@@ -744,7 +775,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
 			log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
 			return true, nil
 		} else {
-			s.sender.Remove(id)
+			s.sendhub.Remove(id)
 			log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
 		}
 	case raftpb.ConfChangeUpdateNode:
@@ -759,7 +790,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
 		if m.ID == s.id {
 			log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		} else {
-			s.sender.Update(m)
+			s.sendhub.Update(m)
 			log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 	}

+ 23 - 20
etcdserver/server_test.go

@@ -29,13 +29,14 @@ import (
 	"testing"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/store"
 )
 
@@ -501,7 +502,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
 		id:      1,
 		node:    &nodeRecorder{},
 		Cluster: cl,
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 	}
 	cc := raftpb.ConfChange{
 		Type:   raftpb.ConfChangeRemoveNode,
@@ -534,6 +535,7 @@ type fakeSender struct {
 	ss []*EtcdServer
 }
 
+func (s *fakeSender) Sender(id types.ID) rafthttp.Sender { return nil }
 func (s *fakeSender) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
 		s.ss[m.To-1].node.Step(context.TODO(), m)
@@ -569,7 +571,7 @@ func testServer(t *testing.T, ns uint64) {
 			node:        n,
 			raftStorage: s,
 			store:       st,
-			sender:      &fakeSender{ss},
+			sendhub:     &fakeSender{ss},
 			storage:     &storageRecorder{},
 			Ticker:      tk.C,
 			Cluster:     cl,
@@ -644,7 +646,7 @@ func TestDoProposal(t *testing.T) {
 			node:        n,
 			raftStorage: s,
 			store:       st,
-			sender:      &nopSender{},
+			sendhub:     &nopSender{},
 			storage:     &storageRecorder{},
 			Ticker:      tk,
 			Cluster:     cl,
@@ -733,7 +735,7 @@ func TestDoProposalStopped(t *testing.T) {
 		node:        n,
 		raftStorage: s,
 		store:       st,
-		sender:      &nopSender{},
+		sendhub:     &nopSender{},
 		storage:     &storageRecorder{},
 		Ticker:      tk,
 		Cluster:     cl,
@@ -845,7 +847,7 @@ func TestSyncTrigger(t *testing.T) {
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
 		store:       &storeRecorder{},
-		sender:      &nopSender{},
+		sendhub:     &nopSender{},
 		storage:     &storageRecorder{},
 		SyncTicker:  st,
 	}
@@ -952,7 +954,7 @@ func TestTriggerSnap(t *testing.T) {
 	cl.SetStore(store.New())
 	srv := &EtcdServer{
 		store:       st,
-		sender:      &nopSender{},
+		sendhub:     &nopSender{},
 		storage:     p,
 		node:        n,
 		raftStorage: s,
@@ -989,7 +991,7 @@ func TestRecvSnapshot(t *testing.T) {
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 		store:       st,
-		sender:      &nopSender{},
+		sendhub:     &nopSender{},
 		storage:     p,
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
@@ -1022,7 +1024,7 @@ func TestRecvSlowSnapshot(t *testing.T) {
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 		store:       st,
-		sender:      &nopSender{},
+		sendhub:     &nopSender{},
 		storage:     &storageRecorder{},
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
@@ -1055,7 +1057,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
 	storage := raft.NewMemoryStorage()
 	s := &EtcdServer{
 		store:       st,
-		sender:      &nopSender{},
+		sendhub:     &nopSender{},
 		storage:     &storageRecorder{},
 		node:        n,
 		raftStorage: storage,
@@ -1101,7 +1103,7 @@ func TestAddMember(t *testing.T) {
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
 		store:       &storeRecorder{},
-		sender:      &nopSender{},
+		sendhub:     &nopSender{},
 		storage:     &storageRecorder{},
 		Cluster:     cl,
 	}
@@ -1139,7 +1141,7 @@ func TestRemoveMember(t *testing.T) {
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
 		store:       &storeRecorder{},
-		sender:      &nopSender{},
+		sendhub:     &nopSender{},
 		storage:     &storageRecorder{},
 		Cluster:     cl,
 	}
@@ -1176,7 +1178,7 @@ func TestUpdateMember(t *testing.T) {
 		node:        n,
 		raftStorage: raft.NewMemoryStorage(),
 		store:       &storeRecorder{},
-		sender:      &nopSender{},
+		sendhub:     &nopSender{},
 		storage:     &storageRecorder{},
 		Cluster:     cl,
 	}
@@ -1245,7 +1247,7 @@ func TestPublish(t *testing.T) {
 func TestPublishStopped(t *testing.T) {
 	srv := &EtcdServer{
 		node:    &nodeRecorder{},
-		sender:  &nopSender{},
+		sendhub: &nopSender{},
 		Cluster: &Cluster{},
 		w:       &waitRecorder{},
 		done:    make(chan struct{}),
@@ -1649,12 +1651,13 @@ func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
 
 type nopSender struct{}
 
-func (s *nopSender) Send(m []raftpb.Message)           {}
-func (s *nopSender) Add(m *Member)                     {}
-func (s *nopSender) Remove(id types.ID)                {}
-func (s *nopSender) Update(m *Member)                  {}
-func (s *nopSender) Stop()                             {}
-func (s *nopSender) ShouldStopNotify() <-chan struct{} { return nil }
+func (s *nopSender) Sender(id types.ID) rafthttp.Sender { return nil }
+func (s *nopSender) Send(m []raftpb.Message)            {}
+func (s *nopSender) Add(m *Member)                      {}
+func (s *nopSender) Remove(id types.ID)                 {}
+func (s *nopSender) Update(m *Member)                   {}
+func (s *nopSender) Stop()                              {}
+func (s *nopSender) ShouldStopNotify() <-chan struct{}  { return nil }
 
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
 	peers := make([]raft.Peer, len(ids))

+ 22 - 19
integration/cluster_test.go

@@ -36,7 +36,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
 	"github.com/coreos/etcd/pkg/types"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
 const (
@@ -100,11 +100,7 @@ func testDoubleClusterSize(t *testing.T, size int) {
 }
 
 func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) }
-func TestDecreaseClusterSizeOf5(t *testing.T) {
-	t.Skip("enable after reducing the election collision rate")
-	// election collision rate is too high when enabling --race
-	testDecreaseClusterSize(t, 5)
-}
+func TestDecreaseClusterSizeOf5(t *testing.T) { testDecreaseClusterSize(t, 5) }
 
 func testDecreaseClusterSize(t *testing.T, size int) {
 	defer afterTest(t)
@@ -112,7 +108,8 @@ func testDecreaseClusterSize(t *testing.T, size int) {
 	c.Launch(t)
 	defer c.Terminate(t)
 
-	for i := 0; i < size-1; i++ {
+	// TODO: remove the last but one member
+	for i := 0; i < size-2; i++ {
 		id := c.Members[len(c.Members)-1].s.ID()
 		c.RemoveMember(t, uint64(id))
 		c.waitLeader(t)
@@ -149,16 +146,7 @@ type cluster struct {
 	Members []*member
 }
 
-// NewCluster returns an unlaunched cluster of the given size which has been
-// set to use static bootstrap.
-func NewCluster(t *testing.T, size int) *cluster {
-	c := &cluster{}
-	ms := make([]*member, size)
-	for i := 0; i < size; i++ {
-		ms[i] = mustNewMember(t, c.name(i))
-	}
-	c.Members = ms
-
+func fillClusterForMembers(ms []*member, cName string) error {
 	addrs := make([]string, 0)
 	for _, m := range ms {
 		for _, l := range m.PeerListeners {
@@ -168,11 +156,26 @@ func NewCluster(t *testing.T, size int) *cluster {
 	clusterStr := strings.Join(addrs, ",")
 	var err error
 	for _, m := range ms {
-		m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
+		m.Cluster, err = etcdserver.NewClusterFromString(cName, clusterStr)
 		if err != nil {
-			t.Fatal(err)
+			return err
 		}
 	}
+	return nil
+}
+
+// NewCluster returns an unlaunched cluster of the given size which has been
+// set to use static bootstrap.
+func NewCluster(t *testing.T, size int) *cluster {
+	c := &cluster{}
+	ms := make([]*member, size)
+	for i := 0; i < size; i++ {
+		ms[i] = mustNewMember(t, c.name(i))
+	}
+	c.Members = ms
+	if err := fillClusterForMembers(c.Members, clusterName); err != nil {
+		t.Fatal(err)
+	}
 
 	return c
 }

+ 34 - 0
integration/migration_test.go

@@ -0,0 +1,34 @@
+package integration
+
+import (
+	"github.com/coreos/etcd/pkg/types"
+	"net"
+	"os/exec"
+	"testing"
+)
+
+func TestUpgradeMember(t *testing.T) {
+	defer afterTest(t)
+	m := mustNewMember(t, "integration046")
+	newPeerListeners := make([]net.Listener, 0)
+	newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, "127.0.0.1:59892"))
+	m.PeerListeners = newPeerListeners
+	urls, err := types.NewURLs([]string{"http://127.0.0.1:59892"})
+	if err != nil {
+		t.Fatal(err)
+	}
+	m.PeerURLs = urls
+	m.NewCluster = true
+	c := &cluster{}
+	c.Members = []*member{m}
+	fillClusterForMembers(c.Members, "etcd-cluster")
+	cmd := exec.Command("cp", "-r", "testdata/integration046_data/conf", "testdata/integration046_data/log", "testdata/integration046_data/snapshot", m.DataDir)
+	err = cmd.Run()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	c.Launch(t)
+	defer c.Terminate(t)
+	clusterMustProgress(t, c)
+}

+ 1 - 0
integration/testdata/integration046_data/conf

@@ -0,0 +1 @@
+{"commitIndex":1,"peers":[]}

BIN
integration/testdata/integration046_data/log


File diff suppressed because it is too large
+ 1 - 0
integration/testdata/integration046_data/snapshot/1_90.ss


+ 3 - 2
migrate/etcd4.go

@@ -8,6 +8,7 @@ import (
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/pkg/types"
 	raftpb "github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/wal"
@@ -125,12 +126,12 @@ func GuessNodeID(nodes map[string]uint64, snap4 *Snapshot4, cfg *Config4, name s
 	if name != "" {
 		log.Printf("Using suggested name %s", name)
 		if val, ok := nodes[name]; ok {
-			log.Printf("Found ID %d", val)
+			log.Printf("Found ID %s", types.ID(val))
 			return val
 		}
 		if snapNodes != nil {
 			if val, ok := snapNodes[name]; ok {
-				log.Printf("Found ID %d", val)
+				log.Printf("Found ID %s", types.ID(val))
 				return val
 			}
 		}

+ 4 - 5
migrate/log.go

@@ -10,7 +10,6 @@ import (
 	"path"
 	"time"
 
-	"github.com/coreos/etcd/etcdserver"
 	etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	etcd4pb "github.com/coreos/etcd/migrate/etcd4pb"
 	"github.com/coreos/etcd/pkg/types"
@@ -56,7 +55,7 @@ func (l Log4) NodeIDs() map[string]uint64 {
 }
 
 func StorePath(key string) string {
-	return path.Join(etcdserver.StoreKeysPrefix, key)
+	return path.Join("/1", key)
 }
 
 func DecodeLog4FromFile(logpath string) (Log4, error) {
@@ -214,7 +213,7 @@ type JoinCommand struct {
 	Name    string `json:"name"`
 	RaftURL string `json:"raftURL"`
 	EtcdURL string `json:"etcdURL"`
-	memb    etcdserver.Member
+	memb    member
 }
 
 func (c *JoinCommand) Type5() raftpb.EntryType {
@@ -496,13 +495,13 @@ func toEntry5(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry,
 	return &ent5, nil
 }
 
-func generateNodeMember(name, rafturl, etcdurl string) *etcdserver.Member {
+func generateNodeMember(name, rafturl, etcdurl string) *member {
 	pURLs, err := types.NewURLs([]string{rafturl})
 	if err != nil {
 		log.Fatalf("Invalid Raft URL %s -- this log could never have worked", rafturl)
 	}
 
-	m := etcdserver.NewMember(name, pURLs, etcdDefaultClusterName, nil)
+	m := NewMember(name, pURLs, etcdDefaultClusterName)
 	m.ClientURLs = []string{etcdurl}
 	return m
 }

+ 1 - 3
migrate/log_test.go

@@ -6,8 +6,6 @@ import (
 	"reflect"
 	"testing"
 	"time"
-
-	"github.com/coreos/etcd/etcdserver"
 )
 
 func TestNewCommand(t *testing.T) {
@@ -21,7 +19,7 @@ func TestNewCommand(t *testing.T) {
 		t.Errorf("couldn't create time: %v", err)
 	}
 
-	m := etcdserver.NewMember("alice", []url.URL{{Scheme: "http", Host: "127.0.0.1:7001"}}, etcdDefaultClusterName, nil)
+	m := NewMember("alice", []url.URL{{Scheme: "http", Host: "127.0.0.1:7001"}}, etcdDefaultClusterName)
 	m.ClientURLs = []string{"http://127.0.0.1:4001"}
 
 	tests := []interface{}{

+ 59 - 0
migrate/member.go

@@ -0,0 +1,59 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package migrate
+
+import (
+	"crypto/sha1"
+	"encoding/binary"
+	"sort"
+
+	"github.com/coreos/etcd/pkg/types"
+)
+
+type raftAttributes struct {
+	PeerURLs []string `json:"peerURLs"`
+}
+
+type attributes struct {
+	Name       string   `json:"name,omitempty"`
+	ClientURLs []string `json:"clientURLs,omitempty"`
+}
+
+type member struct {
+	ID types.ID `json:"id"`
+	raftAttributes
+	attributes
+}
+
+func NewMember(name string, peerURLs types.URLs, clusterName string) *member {
+	m := &member{
+		raftAttributes: raftAttributes{PeerURLs: peerURLs.StringSlice()},
+		attributes:     attributes{Name: name},
+	}
+
+	var b []byte
+	sort.Strings(m.PeerURLs)
+	for _, p := range m.PeerURLs {
+		b = append(b, []byte(p)...)
+	}
+
+	b = append(b, []byte(clusterName)...)
+
+	hash := sha1.Sum(b)
+	m.ID = types.ID(binary.BigEndian.Uint64(hash[:8]))
+	return m
+}

+ 9 - 7
migrate/snapshot.go

@@ -93,11 +93,11 @@ func fixEtcd(n *node) {
 		rafturl := q.Get("raft")
 
 		m := generateNodeMember(name, rafturl, etcdurl)
-		attrBytes, err := json.Marshal(m.Attributes)
+		attrBytes, err := json.Marshal(m.attributes)
 		if err != nil {
 			log.Fatal("Couldn't marshal attributes")
 		}
-		raftBytes, err := json.Marshal(m.RaftAttributes)
+		raftBytes, err := json.Marshal(m.raftAttributes)
 		if err != nil {
 			log.Fatal("Couldn't marshal raft attributes")
 		}
@@ -171,21 +171,23 @@ func (s *Snapshot4) Snapshot5() *raftpb.Snapshot {
 		log.Fatal("Couldn't re-marshal new snapshot")
 	}
 
+	nodes := s.GetNodesFromStore()
+	nodeList := make([]uint64, 0)
+	for _, v := range nodes {
+		nodeList = append(nodeList, v)
+	}
+
 	snap5 := raftpb.Snapshot{
 		Data: newState,
 		Metadata: raftpb.SnapshotMetadata{
 			Index: s.LastIndex,
 			Term:  s.LastTerm,
 			ConfState: raftpb.ConfState{
-				Nodes: make([]uint64, len(s.Peers)),
+				Nodes: nodeList,
 			},
 		},
 	}
 
-	for i, p := range s.Peers {
-		snap5.Metadata.ConfState.Nodes[i] = hashName(p.Name)
-	}
-
 	return &snap5
 }
 

+ 180 - 0
pkg/types/set.go

@@ -0,0 +1,180 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package types
+
+import (
+	"reflect"
+	"sort"
+	"sync"
+)
+
+type Set interface {
+	Add(string)
+	Remove(string)
+	Contains(string) bool
+	Equals(Set) bool
+	Length() int
+	Values() []string
+	Copy() Set
+	Sub(Set) Set
+}
+
+func NewUnsafeSet(values ...string) *unsafeSet {
+	set := &unsafeSet{make(map[string]struct{})}
+	for _, v := range values {
+		set.Add(v)
+	}
+	return set
+}
+
+func NewThreadsafeSet(values ...string) *tsafeSet {
+	us := NewUnsafeSet(values...)
+	return &tsafeSet{us, sync.RWMutex{}}
+}
+
+type unsafeSet struct {
+	d map[string]struct{}
+}
+
+// Add adds a new value to the set (no-op if the value is already present)
+func (us *unsafeSet) Add(value string) {
+	us.d[value] = struct{}{}
+}
+
+// Remove removes the given value from the set
+func (us *unsafeSet) Remove(value string) {
+	delete(us.d, value)
+}
+
+// Contains returns whether the set contains the given value
+func (us *unsafeSet) Contains(value string) (exists bool) {
+	_, exists = us.d[value]
+	return
+}
+
+// ContainsAll returns whether the set contains all given values
+func (us *unsafeSet) ContainsAll(values []string) bool {
+	for _, s := range values {
+		if !us.Contains(s) {
+			return false
+		}
+	}
+	return true
+}
+
+// Equals returns whether the contents of two sets are identical
+func (us *unsafeSet) Equals(other Set) bool {
+	v1 := sort.StringSlice(us.Values())
+	v2 := sort.StringSlice(other.Values())
+	v1.Sort()
+	v2.Sort()
+	return reflect.DeepEqual(v1, v2)
+}
+
+// Length returns the number of elements in the set
+func (us *unsafeSet) Length() int {
+	return len(us.d)
+}
+
+// Values returns the values of the Set in an unspecified order.
+func (us *unsafeSet) Values() (values []string) {
+	values = make([]string, 0)
+	for val, _ := range us.d {
+		values = append(values, val)
+	}
+	return
+}
+
+// Copy creates a new Set containing the values of the first
+func (us *unsafeSet) Copy() Set {
+	cp := NewUnsafeSet()
+	for val, _ := range us.d {
+		cp.Add(val)
+	}
+
+	return cp
+}
+
+// Sub removes all elements in other from the set
+func (us *unsafeSet) Sub(other Set) Set {
+	oValues := other.Values()
+	result := us.Copy().(*unsafeSet)
+
+	for _, val := range oValues {
+		if _, ok := result.d[val]; !ok {
+			continue
+		}
+		delete(result.d, val)
+	}
+
+	return result
+}
+
+type tsafeSet struct {
+	us *unsafeSet
+	m  sync.RWMutex
+}
+
+func (ts *tsafeSet) Add(value string) {
+	ts.m.Lock()
+	defer ts.m.Unlock()
+	ts.us.Add(value)
+}
+
+func (ts *tsafeSet) Remove(value string) {
+	ts.m.Lock()
+	defer ts.m.Unlock()
+	ts.us.Remove(value)
+}
+
+func (ts *tsafeSet) Contains(value string) (exists bool) {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	return ts.us.Contains(value)
+}
+
+func (ts *tsafeSet) Equals(other Set) bool {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	return ts.us.Equals(other)
+}
+
+func (ts *tsafeSet) Length() int {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	return ts.us.Length()
+}
+
+func (ts *tsafeSet) Values() (values []string) {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	return ts.us.Values()
+}
+
+func (ts *tsafeSet) Copy() Set {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	usResult := ts.us.Copy().(*unsafeSet)
+	return &tsafeSet{usResult, sync.RWMutex{}}
+}
+
+func (ts *tsafeSet) Sub(other Set) Set {
+	ts.m.RLock()
+	defer ts.m.RUnlock()
+	usResult := ts.us.Sub(other).(*unsafeSet)
+	return &tsafeSet{usResult, sync.RWMutex{}}
+}

+ 166 - 0
pkg/types/set_test.go

@@ -0,0 +1,166 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package types
+
+import (
+	"reflect"
+	"sort"
+	"testing"
+)
+
+func TestUnsafeSet(t *testing.T) {
+	driveSetTests(t, NewUnsafeSet())
+}
+
+func TestThreadsafeSet(t *testing.T) {
+	driveSetTests(t, NewThreadsafeSet())
+}
+
+// Check that two slices contents are equal; order is irrelevant
+func equal(a, b []string) bool {
+	as := sort.StringSlice(a)
+	bs := sort.StringSlice(b)
+	as.Sort()
+	bs.Sort()
+	return reflect.DeepEqual(as, bs)
+}
+
+func driveSetTests(t *testing.T, s Set) {
+	// Verify operations on an empty set
+	eValues := []string{}
+	values := s.Values()
+	if !reflect.DeepEqual(values, eValues) {
+		t.Fatalf("Expect values=%v got %v", eValues, values)
+	}
+	if l := s.Length(); l != 0 {
+		t.Fatalf("Expected length=0, got %d", l)
+	}
+	for _, v := range []string{"foo", "bar", "baz"} {
+		if s.Contains(v) {
+			t.Fatalf("Expect s.Contains(%q) to be fale, got true", v)
+		}
+	}
+
+	// Add three items, ensure they show up
+	s.Add("foo")
+	s.Add("bar")
+	s.Add("baz")
+
+	eValues = []string{"foo", "bar", "baz"}
+	values = s.Values()
+	if !equal(values, eValues) {
+		t.Fatalf("Expect values=%v got %v", eValues, values)
+	}
+
+	for _, v := range eValues {
+		if !s.Contains(v) {
+			t.Fatalf("Expect s.Contains(%q) to be true, got false", v)
+		}
+	}
+
+	if l := s.Length(); l != 3 {
+		t.Fatalf("Expected length=3, got %d", l)
+	}
+
+	// Add the same item a second time, ensuring it is not duplicated
+	s.Add("foo")
+
+	values = s.Values()
+	if !equal(values, eValues) {
+		t.Fatalf("Expect values=%v got %v", eValues, values)
+	}
+	if l := s.Length(); l != 3 {
+		t.Fatalf("Expected length=3, got %d", l)
+	}
+
+	// Remove all items, ensure they are gone
+	s.Remove("foo")
+	s.Remove("bar")
+	s.Remove("baz")
+
+	eValues = []string{}
+	values = s.Values()
+	if !equal(values, eValues) {
+		t.Fatalf("Expect values=%v got %v", eValues, values)
+	}
+
+	if l := s.Length(); l != 0 {
+		t.Fatalf("Expected length=0, got %d", l)
+	}
+
+	// Create new copies of the set, and ensure they are unlinked to the
+	// original Set by making modifications
+	s.Add("foo")
+	s.Add("bar")
+	cp1 := s.Copy()
+	cp2 := s.Copy()
+	s.Remove("foo")
+	cp3 := s.Copy()
+	cp1.Add("baz")
+
+	for i, tt := range []struct {
+		want []string
+		got  []string
+	}{
+		{[]string{"bar"}, s.Values()},
+		{[]string{"foo", "bar", "baz"}, cp1.Values()},
+		{[]string{"foo", "bar"}, cp2.Values()},
+		{[]string{"bar"}, cp3.Values()},
+	} {
+		if !equal(tt.want, tt.got) {
+			t.Fatalf("case %d: expect values=%v got %v", i, tt.want, tt.got)
+		}
+	}
+
+	for i, tt := range []struct {
+		want bool
+		got  bool
+	}{
+		{true, s.Equals(cp3)},
+		{true, cp3.Equals(s)},
+		{false, s.Equals(cp2)},
+		{false, s.Equals(cp1)},
+		{false, cp1.Equals(s)},
+		{false, cp2.Equals(s)},
+		{false, cp2.Equals(cp1)},
+	} {
+		if tt.got != tt.want {
+			t.Fatalf("case %d: want %t, got %t", i, tt.want, tt.got)
+
+		}
+	}
+
+	// Subtract values from a Set, ensuring a new Set is created and
+	// the original Sets are unmodified
+	sub1 := cp1.Sub(s)
+	sub2 := cp2.Sub(cp1)
+
+	for i, tt := range []struct {
+		want []string
+		got  []string
+	}{
+		{[]string{"foo", "bar", "baz"}, cp1.Values()},
+		{[]string{"foo", "bar"}, cp2.Values()},
+		{[]string{"bar"}, s.Values()},
+		{[]string{"foo", "baz"}, sub1.Values()},
+		{[]string{}, sub2.Values()},
+	} {
+		if !equal(tt.want, tt.got) {
+			t.Fatalf("case %d: expect values=%v got %v", i, tt.want, tt.got)
+		}
+	}
+}

+ 1 - 1
raft/node.go

@@ -21,7 +21,7 @@ import (
 	"log"
 	"reflect"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	pb "github.com/coreos/etcd/raft/raftpb"
 )
 

+ 1 - 1
raft/node_bench_test.go

@@ -19,7 +19,7 @@ package raft
 import (
 	"testing"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
 func BenchmarkOneNode(b *testing.B) {

+ 1 - 1
raft/node_test.go

@@ -21,7 +21,7 @@ import (
 	"testing"
 	"time"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/raft/raftpb"
 )

+ 6 - 2
raft/raft.go

@@ -59,8 +59,12 @@ type progress struct {
 }
 
 func (pr *progress) update(n uint64) {
-	pr.match = n
-	pr.next = n + 1
+	if pr.match < n {
+		pr.match = n
+	}
+	if pr.next < n+1 {
+		pr.next = n + 1
+	}
 }
 
 func (pr *progress) optimisticUpdate(n uint64) {

+ 28 - 0
raft/raft_test.go

@@ -50,6 +50,34 @@ func (r *raft) readMessages() []pb.Message {
 	return msgs
 }
 
+func TestProgressUpdate(t *testing.T) {
+	prevM, prevN := uint64(3), uint64(5)
+	tests := []struct {
+		update uint64
+
+		wm uint64
+		wn uint64
+	}{
+		{prevM - 1, prevM, prevN},         // do not decrease match, next
+		{prevM, prevM, prevN},             // do not decrease next
+		{prevM + 1, prevM + 1, prevN},     // increase match, do not decrease next
+		{prevM + 2, prevM + 2, prevN + 1}, // increase match, next
+	}
+	for i, tt := range tests {
+		p := &progress{
+			match: prevM,
+			next:  prevN,
+		}
+		p.update(tt.update)
+		if p.match != tt.wm {
+			t.Errorf("#%d: match=%d, want %d", i, p.match, tt.wm)
+		}
+		if p.next != tt.wn {
+			t.Errorf("#%d: next=%d, want %d", i, p.next, tt.wn)
+		}
+	}
+}
+
 func TestProgressMaybeDecr(t *testing.T) {
 	tests := []struct {
 		m  uint64

+ 54 - 0
rafthttp/entry_reader.go

@@ -0,0 +1,54 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"encoding/binary"
+	"io"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+type entryReader struct {
+	r io.Reader
+}
+
+func (er *entryReader) readEntries() ([]raftpb.Entry, error) {
+	var l uint64
+	if err := binary.Read(er.r, binary.BigEndian, &l); err != nil {
+		return nil, err
+	}
+	ents := make([]raftpb.Entry, int(l))
+	for i := 0; i < int(l); i++ {
+		if err := er.readEntry(&ents[i]); err != nil {
+			return nil, err
+		}
+	}
+	return ents, nil
+}
+
+func (er *entryReader) readEntry(ent *raftpb.Entry) error {
+	var l uint64
+	if err := binary.Read(er.r, binary.BigEndian, &l); err != nil {
+		return err
+	}
+	buf := make([]byte, int(l))
+	if _, err := io.ReadFull(er.r, buf); err != nil {
+		return err
+	}
+	return ent.Unmarshal(buf)
+}

+ 63 - 0
rafthttp/entry_test.go

@@ -0,0 +1,63 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"bytes"
+	"reflect"
+	"testing"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+func TestEntsWriteAndRead(t *testing.T) {
+	tests := [][]raftpb.Entry{
+		{
+			{},
+		},
+		{
+			{Term: 1, Index: 1},
+		},
+		{
+			{Term: 1, Index: 1},
+			{Term: 1, Index: 2},
+			{Term: 1, Index: 3},
+		},
+		{
+			{Term: 1, Index: 1, Data: []byte("some data")},
+			{Term: 1, Index: 2, Data: []byte("some data")},
+			{Term: 1, Index: 3, Data: []byte("some data")},
+		},
+	}
+	for i, tt := range tests {
+		b := &bytes.Buffer{}
+		ew := &entryWriter{w: b}
+		if err := ew.writeEntries(tt); err != nil {
+			t.Errorf("#%d: unexpected write ents error: %v", i, err)
+			continue
+		}
+		er := &entryReader{r: b}
+		ents, err := er.readEntries()
+		if err != nil {
+			t.Errorf("#%d: unexpected read ents error: %v", i, err)
+			continue
+		}
+		if !reflect.DeepEqual(ents, tt) {
+			t.Errorf("#%d: ents = %+v, want %+v", i, ents, tt)
+		}
+	}
+}

+ 54 - 0
rafthttp/entry_writer.go

@@ -0,0 +1,54 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"encoding/binary"
+	"io"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+type entryWriter struct {
+	w io.Writer
+}
+
+func (ew *entryWriter) writeEntries(ents []raftpb.Entry) error {
+	l := len(ents)
+	if err := binary.Write(ew.w, binary.BigEndian, uint64(l)); err != nil {
+		return err
+	}
+	for i := 0; i < l; i++ {
+		if err := ew.writeEntry(&ents[i]); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (ew *entryWriter) writeEntry(ent *raftpb.Entry) error {
+	size := ent.Size()
+	if err := binary.Write(ew.w, binary.BigEndian, uint64(size)); err != nil {
+		return err
+	}
+	b, err := ent.Marshal()
+	if err != nil {
+		return err
+	}
+	_, err = ew.w.Write(b)
+	return err
+}

+ 86 - 1
rafthttp/http.go

@@ -20,17 +20,30 @@ import (
 	"io/ioutil"
 	"log"
 	"net/http"
+	"path"
+	"strconv"
+	"strings"
 
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+var (
+	RaftPrefix       = "/raft"
+	RaftStreamPrefix = path.Join(RaftPrefix, "stream")
 )
 
 type Processor interface {
 	Process(ctx context.Context, m raftpb.Message) error
 }
 
+type SenderFinder interface {
+	// Sender returns the sender of the given id.
+	Sender(id types.ID) Sender
+}
+
 func NewHandler(p Processor, cid types.ID) http.Handler {
 	return &handler{
 		p:   p,
@@ -38,6 +51,16 @@ func NewHandler(p Processor, cid types.ID) http.Handler {
 	}
 }
 
+// NewStreamHandler returns a handler which initiates streamer when receiving
+// stream request from follower.
+func NewStreamHandler(finder SenderFinder, id, cid types.ID) http.Handler {
+	return &streamHandler{
+		finder: finder,
+		id:     id,
+		cid:    cid,
+	}
+}
+
 type handler struct {
 	p   Processor
 	cid types.ID
@@ -85,6 +108,68 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusNoContent)
 }
 
+type streamHandler struct {
+	finder SenderFinder
+	id     types.ID
+	cid    types.ID
+}
+
+func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != "GET" {
+		w.Header().Set("Allow", "GET")
+		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		return
+	}
+
+	fromStr := strings.TrimPrefix(r.URL.Path, RaftStreamPrefix+"/")
+	from, err := types.IDFromString(fromStr)
+	if err != nil {
+		log.Printf("rafthttp: path %s cannot be parsed", fromStr)
+		http.Error(w, "invalid path", http.StatusNotFound)
+		return
+	}
+	s := h.finder.Sender(from)
+	if s == nil {
+		log.Printf("rafthttp: fail to find sender %s", from)
+		http.Error(w, "error sender not found", http.StatusNotFound)
+		return
+	}
+
+	wcid := h.cid.String()
+	if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != wcid {
+		log.Printf("rafthttp: streaming request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
+		http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed)
+		return
+	}
+
+	wto := h.id.String()
+	if gto := r.Header.Get("X-Raft-To"); gto != wto {
+		log.Printf("rafthttp: streaming request ignored due to ID mismatch got %s want %s", gto, wto)
+		http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
+		return
+	}
+
+	termStr := r.Header.Get("X-Raft-Term")
+	term, err := strconv.ParseUint(termStr, 10, 64)
+	if err != nil {
+		log.Printf("rafthttp: streaming request ignored due to parse term %s error: %v", termStr, err)
+		http.Error(w, "invalid term field", http.StatusBadRequest)
+		return
+	}
+
+	w.WriteHeader(http.StatusOK)
+	w.(http.Flusher).Flush()
+
+	done, err := s.StartStreaming(w.(WriteFlusher), from, term)
+	if err != nil {
+		log.Printf("rafthttp: streaming request ignored due to start streaming error: %v", err)
+		// TODO: consider http status and info here
+		http.Error(w, "error enable streaming", http.StatusInternalServerError)
+		return
+	}
+	<-done
+}
+
 type writerToResponse interface {
 	WriteTo(w http.ResponseWriter)
 }

+ 1 - 1
rafthttp/http_test.go

@@ -29,7 +29,7 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
 func TestServeRaft(t *testing.T) {

+ 92 - 5
rafthttp/sender.go

@@ -36,6 +36,9 @@ const (
 )
 
 type Sender interface {
+	// StartStreaming enables streaming in the sender using the given writer,
+	// which provides a fast and effecient way to send appendEntry messages.
+	StartStreaming(w WriteFlusher, to types.ID, term uint64) (done <-chan struct{}, err error)
 	Update(u string)
 	// Send sends the data to the remote node. It is always non-blocking.
 	// It may be fail to send data if it returns nil error.
@@ -45,14 +48,15 @@ type Sender interface {
 	Stop()
 }
 
-func NewSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
+func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
 	s := &sender{
 		tr:         tr,
 		u:          u,
 		cid:        cid,
+		p:          p,
 		fs:         fs,
-		q:          make(chan []byte, senderBufSize),
 		shouldstop: shouldstop,
+		q:          make(chan []byte, senderBufSize),
 	}
 	s.wg.Add(connPerSender)
 	for i := 0; i < connPerSender; i++ {
@@ -65,11 +69,32 @@ type sender struct {
 	tr         http.RoundTripper
 	u          string
 	cid        types.ID
+	p          Processor
 	fs         *stats.FollowerStats
-	q          chan []byte
-	mu         sync.RWMutex
-	wg         sync.WaitGroup
 	shouldstop chan struct{}
+
+	strmCln   *streamClient
+	strmSrv   *streamServer
+	strmSrvMu sync.Mutex
+	q         chan []byte
+
+	mu sync.RWMutex
+	wg sync.WaitGroup
+}
+
+func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
+	s.strmSrvMu.Lock()
+	defer s.strmSrvMu.Unlock()
+	if s.strmSrv != nil {
+		// ignore lower-term streaming request
+		if term < s.strmSrv.term {
+			return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, s.strmSrv.term)
+		}
+		// stop the existing one
+		s.strmSrv.stop()
+	}
+	s.strmSrv = startStreamServer(w, to, term, s.fs)
+	return s.strmSrv.stopNotify(), nil
 }
 
 func (s *sender) Update(u string) {
@@ -80,6 +105,15 @@ func (s *sender) Update(u string) {
 
 // TODO (xiangli): reasonable retry logic
 func (s *sender) Send(m raftpb.Message) error {
+	s.maybeStopStream(m.Term)
+	if !s.hasStreamClient() && shouldInitStream(m) {
+		s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
+	}
+	if canUseStream(m) {
+		if ok := s.tryStream(m); ok {
+			return nil
+		}
+	}
 	// TODO: don't block. we should be able to have 1000s
 	// of messages out at a time.
 	data := pbutil.MustMarshal(&m)
@@ -95,6 +129,59 @@ func (s *sender) Send(m raftpb.Message) error {
 func (s *sender) Stop() {
 	close(s.q)
 	s.wg.Wait()
+	s.strmSrvMu.Lock()
+	if s.strmSrv != nil {
+		s.strmSrv.stop()
+	}
+	s.strmSrvMu.Unlock()
+	if s.strmCln != nil {
+		s.strmCln.stop()
+	}
+}
+
+func (s *sender) maybeStopStream(term uint64) {
+	if s.strmCln != nil && term > s.strmCln.term {
+		s.strmCln.stop()
+		s.strmCln = nil
+	}
+	s.strmSrvMu.Lock()
+	defer s.strmSrvMu.Unlock()
+	if s.strmSrv != nil && term > s.strmSrv.term {
+		s.strmSrv.stop()
+		s.strmSrv = nil
+	}
+}
+
+func (s *sender) hasStreamClient() bool {
+	return s.strmCln != nil && !s.strmCln.isStopped()
+}
+
+func (s *sender) initStream(from, to types.ID, term uint64) {
+	strmCln := newStreamClient(from, to, term, s.p)
+	s.mu.Lock()
+	u := s.u
+	s.mu.Unlock()
+	if err := strmCln.start(s.tr, u, s.cid); err != nil {
+		log.Printf("rafthttp: start stream client error: %v", err)
+		return
+	}
+	s.strmCln = strmCln
+	log.Printf("rafthttp: start stream client with %s in term %d", to, term)
+}
+
+func (s *sender) tryStream(m raftpb.Message) bool {
+	s.strmSrvMu.Lock()
+	defer s.strmSrvMu.Unlock()
+	if s.strmSrv == nil || m.Term != s.strmSrv.term {
+		return false
+	}
+	if err := s.strmSrv.send(m.Entries); err != nil {
+		log.Printf("rafthttp: send stream message error: %v", err)
+		s.strmSrv.stop()
+		s.strmSrv = nil
+		return false
+	}
+	return true
 }
 
 func (s *sender) handle() {

+ 6 - 6
rafthttp/sender_test.go

@@ -34,7 +34,7 @@ import (
 func TestSenderSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	fs := &stats.FollowerStats{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
 
 	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Fatalf("unexpect send error: %v", err)
@@ -54,7 +54,7 @@ func TestSenderSend(t *testing.T) {
 func TestSenderExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	fs := &stats.FollowerStats{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
 
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
@@ -86,7 +86,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 // it increases fail count in stats.
 func TestSenderSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
-	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
+	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
 
 	if err := s.Send(raftpb.Message{}); err != nil {
 		t.Fatalf("unexpect Send error: %v", err)
@@ -102,7 +102,7 @@ func TestSenderSendFailed(t *testing.T) {
 
 func TestSenderPost(t *testing.T) {
 	tr := &roundTripperRecorder{}
-	s := NewSender(tr, "http://10.0.0.1", types.ID(1), nil, nil)
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, nil, nil)
 	if err := s.post([]byte("some data")); err != nil {
 		t.Fatalf("unexpect post error: %v", err)
 	}
@@ -145,7 +145,7 @@ func TestSenderPostBad(t *testing.T) {
 	}
 	for i, tt := range tests {
 		shouldstop := make(chan struct{})
-		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
+		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop)
 		err := s.post([]byte("some data"))
 		s.Stop()
 
@@ -166,7 +166,7 @@ func TestSenderPostShouldStop(t *testing.T) {
 	}
 	for i, tt := range tests {
 		shouldstop := make(chan struct{}, 1)
-		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
+		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop)
 		s.post([]byte("some data"))
 		s.Stop()
 		select {

+ 207 - 0
rafthttp/streamer.go

@@ -0,0 +1,207 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"fmt"
+	"io"
+	"log"
+	"net/http"
+	"net/url"
+	"path"
+	"strconv"
+	"time"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+const (
+	streamBufSize = 1024
+)
+
+type WriteFlusher interface {
+	io.Writer
+	http.Flusher
+}
+
+type streamServer struct {
+	to   types.ID
+	term uint64
+	fs   *stats.FollowerStats
+	q    chan []raftpb.Entry
+	done chan struct{}
+}
+
+func startStreamServer(w WriteFlusher, to types.ID, term uint64, fs *stats.FollowerStats) *streamServer {
+	s := &streamServer{
+		to:   to,
+		term: term,
+		fs:   fs,
+		q:    make(chan []raftpb.Entry, streamBufSize),
+		done: make(chan struct{}),
+	}
+	go s.handle(w)
+	return s
+}
+
+func (s *streamServer) send(ents []raftpb.Entry) error {
+	select {
+	case <-s.done:
+		return fmt.Errorf("stopped")
+	default:
+	}
+	select {
+	case s.q <- ents:
+		return nil
+	default:
+		log.Printf("rafthttp: streamer reaches maximal serving to %s", s.to)
+		return fmt.Errorf("reach maximal serving")
+	}
+}
+
+func (s *streamServer) stop() {
+	close(s.q)
+	<-s.done
+}
+
+func (s *streamServer) stopNotify() <-chan struct{} { return s.done }
+
+func (s *streamServer) handle(w WriteFlusher) {
+	defer close(s.done)
+
+	ew := &entryWriter{w: w}
+	for ents := range s.q {
+		start := time.Now()
+		if err := ew.writeEntries(ents); err != nil {
+			log.Printf("rafthttp: write ents error: %v", err)
+			return
+		}
+		w.Flush()
+		s.fs.Succ(time.Since(start))
+	}
+}
+
+type streamClient struct {
+	id   types.ID
+	to   types.ID
+	term uint64
+	p    Processor
+
+	closer io.Closer
+	done   chan struct{}
+}
+
+func newStreamClient(id, to types.ID, term uint64, p Processor) *streamClient {
+	return &streamClient{
+		id:   id,
+		to:   to,
+		term: term,
+		p:    p,
+		done: make(chan struct{}),
+	}
+}
+
+// Dial dials to the remote url, and sends streaming request. If it succeeds,
+// it returns nil error, and the caller should call Handle function to keep
+// receiving appendEntry messages.
+func (s *streamClient) start(tr http.RoundTripper, u string, cid types.ID) error {
+	uu, err := url.Parse(u)
+	if err != nil {
+		return fmt.Errorf("parse url %s error: %v", u, err)
+	}
+	uu.Path = path.Join(RaftStreamPrefix, s.id.String())
+	req, err := http.NewRequest("GET", uu.String(), nil)
+	if err != nil {
+		return fmt.Errorf("new request to %s error: %v", u, err)
+	}
+	req.Header.Set("X-Etcd-Cluster-ID", cid.String())
+	req.Header.Set("X-Raft-To", s.to.String())
+	req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10))
+	resp, err := tr.RoundTrip(req)
+	if err != nil {
+		return fmt.Errorf("error posting to %q: %v", u, err)
+	}
+	if resp.StatusCode != http.StatusOK {
+		resp.Body.Close()
+		return fmt.Errorf("unhandled http status %d", resp.StatusCode)
+	}
+	s.closer = resp.Body
+	go s.handle(resp.Body)
+	return nil
+}
+
+func (s *streamClient) stop() {
+	s.closer.Close()
+	<-s.done
+}
+
+func (s *streamClient) isStopped() bool {
+	select {
+	case <-s.done:
+		return true
+	default:
+		return false
+	}
+}
+
+func (s *streamClient) handle(r io.Reader) {
+	defer close(s.done)
+
+	er := &entryReader{r: r}
+	for {
+		ents, err := er.readEntries()
+		if err != nil {
+			if err != io.EOF {
+				log.Printf("rafthttp: read ents error: %v", err)
+			}
+			return
+		}
+		// Considering Commit in MsgApp is not recovered, zero-entry appendEntry
+		// messages have no use to raft state machine. Drop it here because
+		// we don't have easy way to recover its Index easily.
+		if len(ents) == 0 {
+			continue
+		}
+		// The commit index field in appendEntry message is not recovered.
+		// The follower updates its commit index through heartbeat.
+		msg := raftpb.Message{
+			Type:    raftpb.MsgApp,
+			From:    uint64(s.to),
+			To:      uint64(s.id),
+			Term:    s.term,
+			LogTerm: s.term,
+			Index:   ents[0].Index - 1,
+			Entries: ents,
+		}
+		if err := s.p.Process(context.TODO(), msg); err != nil {
+			log.Printf("rafthttp: process raft message error: %v", err)
+			return
+		}
+	}
+}
+
+func shouldInitStream(m raftpb.Message) bool {
+	return m.Type == raftpb.MsgAppResp && m.Reject == false
+}
+
+func canUseStream(m raftpb.Message) bool {
+	return m.Type == raftpb.MsgApp && m.Index > 0 && m.Term == m.LogTerm
+}

+ 33 - 0
wal/util.go

@@ -20,8 +20,41 @@ import (
 	"fmt"
 	"log"
 	"os"
+	"path"
+
+	"github.com/coreos/etcd/pkg/types"
+)
+
+// WalVersion is an enum for versions of etcd logs.
+type WalVersion string
+
+const (
+	WALUnknown  WalVersion = "Unknown WAL"
+	WALNotExist WalVersion = "No WAL"
+	WALv0_4     WalVersion = "0.4.x"
+	WALv0_5     WalVersion = "0.5.x"
 )
 
+func DetectVersion(dirpath string) WalVersion {
+	names, err := readDir(dirpath)
+	if err != nil || len(names) == 0 {
+		return WALNotExist
+	}
+	nameSet := types.NewUnsafeSet(names...)
+	if nameSet.ContainsAll([]string{"snap", "wal"}) {
+		// .../wal cannot be empty to exist.
+		if Exist(path.Join(dirpath, "wal")) {
+			return WALv0_5
+		}
+		return WALNotExist
+	}
+	if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
+		return WALv0_4
+	}
+
+	return WALUnknown
+}
+
 func Exist(dirpath string) bool {
 	names, err := readDir(dirpath)
 	if err != nil {

+ 53 - 0
wal/wal_bench_test.go

@@ -0,0 +1,53 @@
+package wal
+
+import (
+	"io/ioutil"
+	"os"
+	"testing"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+func BenchmarkWrite100EntryWithoutBatch(b *testing.B) { benchmarkWriteEntry(b, 100, 0) }
+func BenchmarkWrite100EntryBatch10(b *testing.B)      { benchmarkWriteEntry(b, 100, 10) }
+func BenchmarkWrite100EntryBatch100(b *testing.B)     { benchmarkWriteEntry(b, 100, 100) }
+func BenchmarkWrite100EntryBatch500(b *testing.B)     { benchmarkWriteEntry(b, 100, 500) }
+func BenchmarkWrite100EntryBatch1000(b *testing.B)    { benchmarkWriteEntry(b, 100, 1000) }
+
+func BenchmarkWrite1000EntryWithoutBatch(b *testing.B) { benchmarkWriteEntry(b, 1000, 0) }
+func BenchmarkWrite1000EntryBatch10(b *testing.B)      { benchmarkWriteEntry(b, 1000, 10) }
+func BenchmarkWrite1000EntryBatch100(b *testing.B)     { benchmarkWriteEntry(b, 1000, 100) }
+func BenchmarkWrite1000EntryBatch500(b *testing.B)     { benchmarkWriteEntry(b, 1000, 500) }
+func BenchmarkWrite1000EntryBatch1000(b *testing.B)    { benchmarkWriteEntry(b, 1000, 1000) }
+
+func benchmarkWriteEntry(b *testing.B, size int, batch int) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
+	if err != nil {
+		b.Fatal(err)
+	}
+	defer os.RemoveAll(p)
+
+	w, err := Create(p, []byte("somedata"))
+	if err != nil {
+		b.Fatalf("err = %v, want nil", err)
+	}
+	data := make([]byte, size)
+	for i := 0; i < len(data); i++ {
+		data[i] = byte(i)
+	}
+	e := &raftpb.Entry{Data: data}
+
+	b.ResetTimer()
+	n := 0
+	for i := 0; i < b.N; i++ {
+		err := w.SaveEntry(e)
+		if err != nil {
+			b.Fatal(err)
+		}
+		n++
+		if n > batch {
+			w.sync()
+			n = 0
+		}
+	}
+}

Some files were not shown because too many files changed in this diff