Browse Source

raft: remove proposal id / add test

Blake Mizerany 11 years ago
parent
commit
7469871d20

+ 12 - 5
etcdserver2/etcdhttp/http.go

@@ -1,15 +1,19 @@
 package etcdhttp
 
 import (
+	"encoding/binary"
 	"encoding/json"
 	"errors"
 	"fmt"
+	"io"
 	"io/ioutil"
+	"log"
 	"net/http"
 	"strconv"
 	"strings"
 	"time"
 
+	"crypto/rand"
 	"code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/elog"
 	etcdserver "github.com/coreos/etcd/etcdserver2"
@@ -24,7 +28,7 @@ const DefaultTimeout = 500 * time.Millisecond
 
 type Handler struct {
 	Timeout time.Duration
-	Server  etcdserver.Server
+	Server  *etcdserver.Server
 }
 
 func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -53,8 +57,7 @@ func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.R
 
 	resp, err := h.Server.Do(ctx, rr)
 	if err != nil {
-		// TODO(bmizerany): switch on store errors and etcdserver.ErrUnknownMethod
-		panic("TODO")
+		log.Println(err)
 	}
 
 	if err := encodeResponse(ctx, w, resp); err != nil {
@@ -77,7 +80,11 @@ func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.R
 }
 
 func genId() int64 {
-	panic("implement me")
+	b := make([]byte, 8)
+	if _, err := io.ReadFull(rand.Reader, b); err != nil {
+		panic(err) // really bad stuff happened
+	}
+	return int64(binary.BigEndian.Uint64(b))
 }
 
 func parseRequest(r *http.Request) etcdserverpb.Request {
@@ -85,7 +92,7 @@ func parseRequest(r *http.Request) etcdserverpb.Request {
 	rr := etcdserverpb.Request{
 		Id:        genId(),
 		Method:    r.Method,
-		Path:      r.URL.Path[len("/keys/"):],
+		Path:      r.URL.Path[len("/keys"):],
 		Val:       q.Get("value"),
 		PrevValue: q.Get("prevValue"),
 		PrevIndex: parseUint64(q.Get("prevIndex")),

+ 80 - 2
etcdserver2/etcdhttp/http_test.go

@@ -1,5 +1,83 @@
 package etcdhttp
 
-import "testing"
+import (
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"net/http/httptest"
+	"net/url"
+	"reflect"
+	"testing"
+	"time"
+	"code.google.com/p/go.net/context"
 
-func TestHandler(t *testing.T) {}
+	etcdserver "github.com/coreos/etcd/etcdserver2"
+	"github.com/coreos/etcd/etcdserver2/etcdserverpb"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/store"
+)
+
+func nopSave(st raftpb.State, ents []raftpb.Entry) {}
+func nopSend(m []raftpb.Message)                   {}
+
+func TestSet(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	st := store.New()
+
+	n := raft.Start(ctx, 1, []int64{1})
+	n.Campaign(ctx)
+
+	srv := &etcdserver.Server{
+		Node:  n,
+		Store: st,
+		Send:  etcdserver.SendFunc(nopSend),
+		Save: func(st raftpb.State, ents []raftpb.Entry) {
+			for _, e := range ents {
+				var r etcdserverpb.Request
+				if err := r.Unmarshal(e.Data); err != nil {
+					t.Fatal(err)
+				}
+				fmt.Printf("r.Path: %q\n", r.Path)
+			}
+		},
+	}
+	etcdserver.Start(srv)
+	defer srv.Stop()
+
+	h := Handler{
+		Timeout: time.Hour,
+		Server:  srv,
+	}
+
+	s := httptest.NewServer(h)
+	defer s.Close()
+
+	resp, err := http.PostForm(s.URL+"/keys/foo", url.Values{"value": {"bar"}})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if resp.StatusCode != 201 {
+		t.Errorf("StatusCode = %d, expected %d", 201, resp.StatusCode)
+	}
+
+	g := new(store.Event)
+	if err := json.NewDecoder(resp.Body).Decode(&g); err != nil {
+		t.Fatal(err)
+	}
+
+	w := &store.NodeExtern{
+		Key:           "/foo/1",
+		Value:         stringp("bar"),
+		ModifiedIndex: 1,
+		CreatedIndex:  1,
+	}
+	if !reflect.DeepEqual(g.Node, w) {
+		t.Errorf("g = %+v, want %+v", g.Node, w)
+	}
+}
+
+func stringp(s string) *string { return &s }

+ 28 - 18
etcdserver2/server.go

@@ -6,14 +6,17 @@ import (
 	"time"
 
 	"code.google.com/p/go.net/context"
+	pb "github.com/coreos/etcd/etcdserver2/etcdserverpb"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/wait"
-	pb "github.com/coreos/etcd/etcdserver2/etcdserverpb"
 )
 
-var ErrUnknownMethod = errors.New("etcdserver: unknown method")
+var (
+	ErrUnknownMethod = errors.New("etcdserver: unknown method")
+	ErrStopped       = errors.New("etcdserver: server stopped")
+)
 
 type SendFunc func(m []raftpb.Message)
 
@@ -32,7 +35,8 @@ type Response struct {
 
 type Server struct {
 	once sync.Once
-	w    wait.List
+	w    *wait.List
+	done chan struct{}
 
 	Node  raft.Node
 	Store store.Store
@@ -49,10 +53,14 @@ type Server struct {
 	Save func(st raftpb.State, ents []raftpb.Entry)
 }
 
-func (s *Server) init() { s.w = wait.New() }
+// Start prepares and starts server in a new goroutine.
+func Start(s *Server) {
+	s.w = wait.New()
+	s.done = make(chan struct{})
+	go s.run()
+}
 
-func (s *Server) Run(ctx context.Context) {
-	s.once.Do(s.init)
+func (s *Server) run() {
 	for {
 		select {
 		case rd := <-s.Node.Ready():
@@ -63,21 +71,26 @@ func (s *Server) Run(ctx context.Context) {
 			// care to apply entries in a single goroutine, and not
 			// race them.
 			for _, e := range rd.CommittedEntries {
+				var r pb.Request
+				if err := r.Unmarshal(e.Data); err != nil {
+					panic("TODO: this is bad, what do we do about it?")
+				}
+
 				var resp Response
-				resp.Event, resp.err = s.apply(context.TODO(), e)
+				resp.Event, resp.err = s.apply(context.TODO(), r)
 				resp.Term = rd.Term
 				resp.Commit = rd.Commit
-				s.w.Trigger(e.Id, resp)
+				s.w.Trigger(r.Id, resp)
 			}
-		case <-ctx.Done():
+		case <-s.done:
 			return
 		}
-
 	}
 }
 
+func (s *Server) Stop() { close(s.done) }
+
 func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) {
-	s.once.Do(s.init)
 	if r.Id == 0 {
 		panic("r.Id cannot be 0")
 	}
@@ -88,7 +101,7 @@ func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) {
 			return Response{}, err
 		}
 		ch := s.w.Register(r.Id)
-		s.Node.Propose(ctx, r.Id, data)
+		s.Node.Propose(ctx, data)
 		select {
 		case x := <-ch:
 			resp := x.(Response)
@@ -96,6 +109,8 @@ func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) {
 		case <-ctx.Done():
 			s.w.Trigger(r.Id, nil) // GC wait
 			return Response{}, ctx.Err()
+		case <-s.done:
+			return Response{}, ErrStopped
 		}
 	case "GET":
 		switch {
@@ -118,12 +133,7 @@ func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) {
 }
 
 // apply interprets r as a call to store.X and returns an Response interpreted from store.Event
-func (s *Server) apply(ctx context.Context, e raftpb.Entry) (*store.Event, error) {
-	var r pb.Request
-	if err := r.Unmarshal(e.Data); err != nil {
-		return nil, err
-	}
-
+func (s *Server) apply(ctx context.Context, r pb.Request) (*store.Event, error) {
 	expr := time.Unix(0, r.Expiration)
 	switch r.Method {
 	case "POST":

+ 51 - 2
etcdserver2/server_test.go

@@ -1,5 +1,54 @@
 package etcdserver
 
-import "testing"
+import (
+	"reflect"
+	"testing"
+	"code.google.com/p/go.net/context"
 
-func TestServer(t *testing.T) {}
+	pb "github.com/coreos/etcd/etcdserver2/etcdserverpb"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/store"
+)
+
+func TestServer(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	n := raft.Start(ctx, 1, []int64{1})
+	n.Campaign(ctx)
+
+	srv := &Server{
+		Node:  n,
+		Store: store.New(),
+		Send:  func(_ []raftpb.Message) {},
+		Save:  func(_ raftpb.State, _ []raftpb.Entry) {},
+	}
+	Start(srv)
+	defer srv.Stop()
+
+	r := pb.Request{
+		Method: "PUT",
+		Id:     1,
+		Path:   "/foo",
+		Val:    "bar",
+	}
+	resp, err := srv.Do(ctx, r)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	g, w := resp.Event.Node, &store.NodeExtern{
+		Key:           "/foo",
+		ModifiedIndex: 1,
+		CreatedIndex:  1,
+		Value:         stringp("bar"),
+	}
+
+	if !reflect.DeepEqual(g, w) {
+		t.Error("value:", *g.Value)
+		t.Errorf("g = %+v, w %+v", g, w)
+	}
+}
+
+func stringp(s string) *string { return &s }

+ 7 - 3
raft/node.go

@@ -3,7 +3,6 @@ package raft
 
 import (
 	"code.google.com/p/go.net/context"
-
 	pb "github.com/coreos/etcd/raft/raftpb"
 )
 
@@ -78,6 +77,7 @@ func (n *Node) run(r *raft) {
 
 		if rd.containsUpdates(prev) {
 			readyc = n.readyc
+			prev = rd
 		} else {
 			readyc = nil
 		}
@@ -109,9 +109,13 @@ func (n *Node) Tick() error {
 	}
 }
 
+func (n *Node) Campaign(ctx context.Context) error {
+	return n.Step(ctx, pb.Message{Type: msgHup})
+}
+
 // Propose proposes data be appended to the log.
-func (n *Node) Propose(ctx context.Context, id int64, data []byte) error {
-	return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Id: id, Data: data}}})
+func (n *Node) Propose(ctx context.Context, data []byte) error {
+	return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}})
 }
 
 // Step advances the state machine using msgs. The ctx.Err() will be returned,

+ 47 - 2
raft/node_test.go

@@ -1,5 +1,50 @@
 package raft
 
-import "testing"
+import (
+	"reflect"
+	"testing"
+	"github.com/coreos/etcd/raft/raftpb"
 
-func TestNode(t *testing.T) {}
+	"code.google.com/p/go.net/context"
+)
+
+func TestNode(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	n := Start(ctx, 1, []int64{1})
+	ch := make(chan Ready)
+	go func() {
+		for {
+			ch <- <-n.Ready()
+		}
+	}()
+	n.Campaign(ctx)
+	n.Propose(ctx, []byte("foo"))
+
+	want := []Ready{
+		{
+			State:            raftpb.State{Term: 1, Vote: -1, Commit: 1, LastIndex: 1},
+			Entries:          []raftpb.Entry{{Term: 1, Index: 1}},
+			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}},
+		},
+		{
+			State:            raftpb.State{Term: 1, Vote: -1, Commit: 2, LastIndex: 2},
+			Entries:          []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}},
+			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}},
+		},
+	}
+
+	for i, w := range want {
+		if g := <-ch; !reflect.DeepEqual(g, w) {
+			t.Errorf("#%d: g = %+v,\n             w   %+v", i, g, w)
+		}
+	}
+
+	select {
+	case rd := <-ch:
+		t.Errorf("unexpected Ready: %+v", rd)
+	default:
+	}
+
+}

+ 12 - 32
raft/raftpb/raft.pb.go

@@ -3,7 +3,7 @@
 // DO NOT EDIT!
 
 /*
-	Package raftis a generated protocol buffer package.
+	Package raftpb is a generated protocol buffer package.
 
 	It is generated from these files:
 		raft.proto
@@ -20,7 +20,7 @@ import proto "code.google.com/p/gogoprotobuf/proto"
 import json "encoding/json"
 import math "math"
 
-// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.
+// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb"
 
 import io "io"
 import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto"
@@ -35,7 +35,6 @@ type Entry struct {
 	Term             int64  `protobuf:"varint,2,req,name=term" json:"term"`
 	Index            int64  `protobuf:"varint,3,req,name=index" json:"index"`
 	Data             []byte `protobuf:"bytes,4,opt,name=data" json:"data"`
-	Id               int64  `protobuf:"varint,5,req,name=id" json:"id"`
 	XXX_unrecognized []byte `json:"-"`
 }
 
@@ -56,16 +55,16 @@ func (m *Snapshot) String() string { return proto.CompactTextString(m) }
 func (*Snapshot) ProtoMessage()    {}
 
 type Message struct {
-	Type             int64      `protobuf:"varint,1,req,name=type" json:"type"`
-	To               int64      `protobuf:"varint,2,req,name=to" json:"to"`
-	From             int64      `protobuf:"varint,3,req,name=from" json:"from"`
-	Term             int64      `protobuf:"varint,4,req,name=term" json:"term"`
-	LogTerm          int64      `protobuf:"varint,5,req,name=logTerm" json:"logTerm"`
-	Index            int64      `protobuf:"varint,6,req,name=index" json:"index"`
-	Entries          []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"`
-	Commit           int64      `protobuf:"varint,8,req,name=commit" json:"commit"`
-	Snapshot         Snapshot   `protobuf:"bytes,9,req,name=snapshot" json:"snapshot"`
-	XXX_unrecognized []byte     `json:"-"`
+	Type             int64    `protobuf:"varint,1,req,name=type" json:"type"`
+	To               int64    `protobuf:"varint,2,req,name=to" json:"to"`
+	From             int64    `protobuf:"varint,3,req,name=from" json:"from"`
+	Term             int64    `protobuf:"varint,4,req,name=term" json:"term"`
+	LogTerm          int64    `protobuf:"varint,5,req,name=logTerm" json:"logTerm"`
+	Index            int64    `protobuf:"varint,6,req,name=index" json:"index"`
+	Entries          []Entry  `protobuf:"bytes,7,rep,name=entries" json:"entries"`
+	Commit           int64    `protobuf:"varint,8,req,name=commit" json:"commit"`
+	Snapshot         Snapshot `protobuf:"bytes,9,req,name=snapshot" json:"snapshot"`
+	XXX_unrecognized []byte   `json:"-"`
 }
 
 func (m *Message) Reset()         { *m = Message{} }
@@ -172,21 +171,6 @@ func (m *Entry) Unmarshal(data []byte) error {
 			}
 			m.Data = append(m.Data, data[index:postIndex]...)
 			index = postIndex
-		case 5:
-			if wireType != 0 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
-			}
-			for shift := uint(0); ; shift += 7 {
-				if index >= l {
-					return io.ErrUnexpectedEOF
-				}
-				b := data[index]
-				index++
-				m.Id |= (int64(b) & 0x7F) << shift
-				if b < 0x80 {
-					break
-				}
-			}
 		default:
 			var sizeOfWire int
 			for {
@@ -625,7 +609,6 @@ func (m *Entry) Size() (n int) {
 	n += 1 + sovRaft(uint64(m.Index))
 	l = len(m.Data)
 	n += 1 + l + sovRaft(uint64(l))
-	n += 1 + sovRaft(uint64(m.Id))
 	if m.XXX_unrecognized != nil {
 		n += len(m.XXX_unrecognized)
 	}
@@ -725,9 +708,6 @@ func (m *Entry) MarshalTo(data []byte) (n int, err error) {
 	i++
 	i = encodeVarintRaft(data, i, uint64(len(m.Data)))
 	i += copy(data[i:], m.Data)
-	data[i] = 0x28
-	i++
-	i = encodeVarintRaft(data, i, uint64(m.Id))
 	if m.XXX_unrecognized != nil {
 		i += copy(data[i:], m.XXX_unrecognized)
 	}

+ 0 - 1
raft/raftpb/raft.proto

@@ -12,7 +12,6 @@ message Entry {
 	required int64 term  = 2 [(gogoproto.nullable) = false];
 	required int64 index = 3 [(gogoproto.nullable) = false];
 	optional bytes data  = 4 [(gogoproto.nullable) = false];
-	required int64 id    = 5 [(gogoproto.nullable) = false];
 }
 
 message Snapshot {

+ 7 - 5
wait/wait.go

@@ -1,17 +1,19 @@
 package wait
 
-import "sync"
+import (
+	"sync"
+)
 
 type List struct {
 	l sync.Mutex
 	m map[int64]chan interface{}
 }
 
-func New() List {
-	return List{m: make(map[int64]chan interface{})}
+func New() *List {
+	return &List{m: make(map[int64]chan interface{})}
 }
 
-func (w List) Register(id int64) <-chan interface{} {
+func (w *List) Register(id int64) <-chan interface{} {
 	w.l.Lock()
 	defer w.l.Unlock()
 	ch := w.m[id]
@@ -22,7 +24,7 @@ func (w List) Register(id int64) <-chan interface{} {
 	return ch
 }
 
-func (w List) Trigger(id int64, x interface{}) {
+func (w *List) Trigger(id int64, x interface{}) {
 	w.l.Lock()
 	ch := w.m[id]
 	delete(w.m, id)