Browse Source

raft: ReadState -> Ready, which returns a chan

Blake Mizerany 11 years ago
parent
commit
021e231476
5 changed files with 209 additions and 77 deletions
  1. 106 5
      etcdserver2/request.pb.go
  2. 13 9
      etcdserver2/request.proto
  3. 57 27
      etcdserver2/server.go
  4. 4 12
      raft/example_test.go
  5. 29 24
      raft/node.go

+ 106 - 5
etcdserver2/request.pb.go

@@ -30,13 +30,17 @@ var _ = math.Inf
 type Request struct {
 	Id               int64  `protobuf:"varint,1,req,name=id" json:"id"`
 	Method           string `protobuf:"bytes,2,req,name=method" json:"method"`
-	Key              string `protobuf:"bytes,3,req,name=key" json:"key"`
+	Path             string `protobuf:"bytes,3,req,name=path" json:"path"`
 	Val              string `protobuf:"bytes,4,req,name=val" json:"val"`
 	Dir              bool   `protobuf:"varint,5,req,name=dir" json:"dir"`
 	PrevValue        string `protobuf:"bytes,6,req,name=prevValue" json:"prevValue"`
 	PrevIndex        int64  `protobuf:"varint,7,req,name=prevIndex" json:"prevIndex"`
 	PrevExists       bool   `protobuf:"varint,8,req,name=prevExists" json:"prevExists"`
 	Expiration       int64  `protobuf:"varint,9,req,name=expiration" json:"expiration"`
+	Wait             bool   `protobuf:"varint,10,req,name=wait" json:"wait"`
+	Since            uint64 `protobuf:"varint,11,req,name=since" json:"since"`
+	Recursive        bool   `protobuf:"varint,12,req,name=recursive" json:"recursive"`
+	Sorted           bool   `protobuf:"varint,13,req,name=sorted" json:"sorted"`
 	XXX_unrecognized []byte `json:"-"`
 }
 
@@ -122,7 +126,7 @@ func (m *Request) Unmarshal(data []byte) error {
 			if postIndex > l {
 				return io.ErrUnexpectedEOF
 			}
-			m.Key = string(data[index:postIndex])
+			m.Path = string(data[index:postIndex])
 			index = postIndex
 		case 4:
 			if wireType != 2 {
@@ -232,6 +236,72 @@ func (m *Request) Unmarshal(data []byte) error {
 					break
 				}
 			}
+		case 10:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Wait = bool(v != 0)
+		case 11:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Since |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 12:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Recursive = bool(v != 0)
+		case 13:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Sorted = bool(v != 0)
 		default:
 			var sizeOfWire int
 			for {
@@ -261,7 +331,7 @@ func (m *Request) Size() (n int) {
 	n += 1 + sovRequest(uint64(m.Id))
 	l = len(m.Method)
 	n += 1 + l + sovRequest(uint64(l))
-	l = len(m.Key)
+	l = len(m.Path)
 	n += 1 + l + sovRequest(uint64(l))
 	l = len(m.Val)
 	n += 1 + l + sovRequest(uint64(l))
@@ -271,6 +341,10 @@ func (m *Request) Size() (n int) {
 	n += 1 + sovRequest(uint64(m.PrevIndex))
 	n += 2
 	n += 1 + sovRequest(uint64(m.Expiration))
+	n += 2
+	n += 1 + sovRequest(uint64(m.Since))
+	n += 2
+	n += 2
 	if m.XXX_unrecognized != nil {
 		n += len(m.XXX_unrecognized)
 	}
@@ -314,8 +388,8 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) {
 	i += copy(data[i:], m.Method)
 	data[i] = 0x1a
 	i++
-	i = encodeVarintRequest(data, i, uint64(len(m.Key)))
-	i += copy(data[i:], m.Key)
+	i = encodeVarintRequest(data, i, uint64(len(m.Path)))
+	i += copy(data[i:], m.Path)
 	data[i] = 0x22
 	i++
 	i = encodeVarintRequest(data, i, uint64(len(m.Val)))
@@ -346,6 +420,33 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) {
 	data[i] = 0x48
 	i++
 	i = encodeVarintRequest(data, i, uint64(m.Expiration))
+	data[i] = 0x50
+	i++
+	if m.Wait {
+		data[i] = 1
+	} else {
+		data[i] = 0
+	}
+	i++
+	data[i] = 0x58
+	i++
+	i = encodeVarintRequest(data, i, uint64(m.Since))
+	data[i] = 0x60
+	i++
+	if m.Recursive {
+		data[i] = 1
+	} else {
+		data[i] = 0
+	}
+	i++
+	data[i] = 0x68
+	i++
+	if m.Sorted {
+		data[i] = 1
+	} else {
+		data[i] = 0
+	}
+	i++
 	if m.XXX_unrecognized != nil {
 		i += copy(data[i:], m.XXX_unrecognized)
 	}

+ 13 - 9
etcdserver2/request.proto

@@ -8,13 +8,17 @@ option (gogoproto.unmarshaler_all) = true;
 option (gogoproto.goproto_getters_all) = false;
 
 message Request {
-	required int64  id         = 1 [(gogoproto.nullable) = false];     
-	required string method     = 2 [(gogoproto.nullable) = false];
-	required string key        = 3 [(gogoproto.nullable) = false];
-	required string val        = 4 [(gogoproto.nullable) = false];
-	required bool   dir        = 5 [(gogoproto.nullable) = false];
-	required string prevValue  = 6 [(gogoproto.nullable) = false];
-	required int64  prevIndex  = 7 [(gogoproto.nullable) = false];
-	required bool   prevExists = 8 [(gogoproto.nullable) = false];
-	required int64  expiration = 9 [(gogoproto.nullable) = false];
+	required int64  id         =  1 [(gogoproto.nullable) = false];     
+	required string method     =  2 [(gogoproto.nullable) = false];
+	required string path       =  3 [(gogoproto.nullable) = false];
+	required string val        =  4 [(gogoproto.nullable) = false];
+	required bool   dir        =  5 [(gogoproto.nullable) = false];
+	required string prevValue  =  6 [(gogoproto.nullable) = false];
+	required int64  prevIndex  =  7 [(gogoproto.nullable) = false];
+	required bool   prevExists =  8 [(gogoproto.nullable) = false];
+	required int64  expiration =  9 [(gogoproto.nullable) = false];
+	required bool   wait       = 10 [(gogoproto.nullable) = false];
+	required uint64  since      = 11 [(gogoproto.nullable) = false];
+	required bool   recursive  = 12 [(gogoproto.nullable) = false];
+	required bool   sorted     = 13 [(gogoproto.nullable) = false];
 }

+ 57 - 27
etcdserver2/server.go

@@ -1,14 +1,22 @@
 package etcdserver
 
 import (
-	"log"
-
 	"code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/wait"
 )
 
 type Response struct {
+	// The last seen term raft was at when this request was built.
+	Term int
+
+	// The last seen index raft was at when this request was built.
+	Index int
+
+	*store.Event
+	*store.Watcher
+
 	err error
 }
 
@@ -18,6 +26,8 @@ type Server struct {
 
 	msgsc chan raft.Message
 
+	st store.Store
+
 	// Send specifies the send function for sending msgs to peers. Send
 	// MUST NOT block. It is okay to drop messages, since clients should
 	// timeout and reissue their messages.  If Send is nil, Server will
@@ -32,20 +42,21 @@ type Server struct {
 
 func (s *Server) Run(ctx context.Context) {
 	for {
-		st, ents, cents, msgs, err := s.n.ReadState(ctx)
-		if err != nil {
-			log.Println("etcdserver: error while reading state -", err)
+		select {
+		case rd := <-s.n.Ready():
+			s.Save(rd.State, rd.Entries)
+			s.Send(rd.Messages)
+			go func() {
+				for _, e := range rd.CommittedEntries {
+					var r Request
+					r.Unmarshal(e.Data)
+					s.w.Trigger(r.Id, s.apply(r))
+				}
+			}()
+		case <-ctx.Done():
 			return
 		}
-		s.Save(st, ents)
-		s.Send(msgs)
-		go func() {
-			for _, e := range cents {
-				var r Request
-				r.Unmarshal(e.Data)
-				s.w.Trigger(r.Id, s.apply(r))
-			}
-		}()
+
 	}
 }
 
@@ -53,20 +64,39 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
 	if r.Id == 0 {
 		panic("r.Id cannot be 0")
 	}
-	data, err := r.Marshal()
-	if err != nil {
-		return Response{}, err
-	}
-	ch := s.w.Register(r.Id)
-	s.n.Propose(ctx, data)
-	select {
-	case x := <-ch:
-		resp := x.(Response)
-		return resp, resp.err
-	case <-ctx.Done():
-		s.w.Trigger(r.Id, nil) // GC wait
-		return Response{}, ctx.Err()
+	switch r.Method {
+	case "POST", "PUT", "DELETE":
+		data, err := r.Marshal()
+		if err != nil {
+			return Response{}, err
+		}
+		ch := s.w.Register(r.Id)
+		s.n.Propose(ctx, data)
+		select {
+		case x := <-ch:
+			resp := x.(Response)
+			return resp, resp.err
+		case <-ctx.Done():
+			s.w.Trigger(r.Id, nil) // GC wait
+			return Response{}, ctx.Err()
+		}
+	case "GET":
+		switch {
+		case r.Wait:
+			wc, err := s.st.Watch(r.Path, r.Recursive, false, r.Since)
+			if err != nil {
+				return Response{}, err
+			}
+			return Response{Watcher: wc}, nil
+		default:
+			ev, err := s.st.Get(r.Path, r.Recursive, r.Sorted)
+			if err != nil {
+				return Response{}, err
+			}
+			return Response{Event: ev}, nil
+		}
 	}
+	panic("not reached") // for some reason the compiler wants this... :/
 }
 
 // apply interprets r as a call to store.X and returns an Response interpreted from store.Event

+ 4 - 12
raft/example_test.go

@@ -1,10 +1,6 @@
 package raft
 
-import (
-	"log"
-
-	"code.google.com/p/go.net/context"
-)
+import "code.google.com/p/go.net/context"
 
 func applyToStore(ents []Entry)   {}
 func sendMessages(msgs []Message) {}
@@ -20,14 +16,10 @@ func Example_Node() {
 	var prev State
 	for {
 		// ReadState blocks until there is new state ready.
-		st, ents, cents, msgs, err := n.ReadState(context.Background())
-		if err != nil {
-			log.Fatal(err)
-		}
-
-		if !prev.Equal(st) {
+		rd := <-n.Ready()
+		if !prev.Equal(rd.State) {
 			saveStateToDisk(st)
-			prev = st
+			prev = rd.State
 		}
 
 		saveToDisk(ents)

+ 29 - 24
raft/node.go

@@ -7,25 +7,37 @@ import (
 	"code.google.com/p/go.net/context"
 )
 
-type stateResp struct {
-	st          State
-	ents, cents []Entry
-	msgs        []Message
+type Ready struct {
+	// The current state of a Node
+	State
+
+	// Entries specifies entries to be saved to stable storage BEFORE
+	// Messages are sent.
+	Entries []Entry
+
+	// CommittedEntries specifies entries to be committed to a
+	// store/state-machine. These have previously been committed to stable
+	// store.
+	CommittedEntries []Entry
+
+	// Messages specifies outbound messages to be sent AFTER Entries are
+	// committed to stable storage.
+	Messages []Message
 }
 
 func (a State) Equal(b State) bool {
 	return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex
 }
 
-func (sr stateResp) containsUpdates(prev stateResp) bool {
-	return !prev.st.Equal(sr.st) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0
+func (rd Ready) containsUpdates(prev Ready) bool {
+	return !prev.State.Equal(rd.State) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
 }
 
 type Node struct {
 	ctx    context.Context
 	propc  chan Message
 	recvc  chan Message
-	statec chan stateResp
+	readyc chan Ready
 	tickc  chan struct{}
 }
 
@@ -34,7 +46,7 @@ func Start(ctx context.Context, id int64, peers []int64) Node {
 		ctx:    ctx,
 		propc:  make(chan Message),
 		recvc:  make(chan Message),
-		statec: make(chan stateResp),
+		readyc: make(chan Ready),
 		tickc:  make(chan struct{}),
 	}
 	r := newRaft(id, peers)
@@ -44,9 +56,9 @@ func Start(ctx context.Context, id int64, peers []int64) Node {
 
 func (n *Node) run(r *raft) {
 	propc := n.propc
-	statec := n.statec
+	readyc := n.readyc
 
-	var prev stateResp
+	var prev Ready
 	for {
 		if r.hasLeader() {
 			propc = n.propc
@@ -57,17 +69,17 @@ func (n *Node) run(r *raft) {
 			propc = nil
 		}
 
-		sr := stateResp{
+		rd := Ready{
 			r.State,
 			r.raftLog.unstableEnts(),
 			r.raftLog.nextEnts(),
 			r.msgs,
 		}
 
-		if sr.containsUpdates(prev) {
-			statec = n.statec
+		if rd.containsUpdates(prev) {
+			readyc = n.readyc
 		} else {
-			statec = nil
+			readyc = nil
 		}
 
 		select {
@@ -78,7 +90,7 @@ func (n *Node) run(r *raft) {
 			r.Step(m) // raft never returns an error
 		case <-n.tickc:
 			// r.tick()
-		case statec <- sr:
+		case readyc <- rd:
 			r.raftLog.resetNextEnts()
 			r.raftLog.resetUnstable()
 			r.msgs = nil
@@ -127,15 +139,8 @@ func (n *Node) Step(ctx context.Context, msgs []Message) error {
 }
 
 // ReadState returns the current point-in-time state.
-func (n *Node) ReadState(ctx context.Context) (st State, ents, cents []Entry, msgs []Message, err error) {
-	select {
-	case sr := <-n.statec:
-		return sr.st, sr.ents, sr.cents, sr.msgs, nil
-	case <-ctx.Done():
-		return State{}, nil, nil, nil, ctx.Err()
-	case <-n.ctx.Done():
-		return State{}, nil, nil, nil, n.ctx.Err()
-	}
+func (n *Node) Ready() <-chan Ready {
+	return n.readyc
 }
 
 type byMsgType []Message