Browse Source

etcdserver: add RaftIndex and RaftTerm

Jonathan Boulle 11 years ago
parent
commit
9b3478218e
3 changed files with 62 additions and 14 deletions
  1. 13 7
      etcdserver/etcdhttp/http.go
  2. 28 7
      etcdserver/etcdhttp/http_test.go
  3. 21 0
      etcdserver/server.go

+ 13 - 7
etcdserver/etcdhttp/http.go

@@ -35,10 +35,11 @@ const (
 var errClosed = errors.New("etcdhttp: client closed connection")
 
 // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
-func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
+func NewClientHandler(server *etcdserver.EtcdServer, peers Peers, timeout time.Duration) http.Handler {
 	sh := &serverHandler{
 		server:  server,
 		peers:   peers,
+		timer:   server,
 		timeout: timeout,
 	}
 	if sh.timeout == 0 {
@@ -69,6 +70,7 @@ func NewPeerHandler(server etcdserver.Server) http.Handler {
 type serverHandler struct {
 	timeout time.Duration
 	server  etcdserver.Server
+	timer   etcdserver.RaftTimer
 	peers   Peers
 }
 
@@ -94,14 +96,14 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
 
 	switch {
 	case resp.Event != nil:
-		if err := writeEvent(w, resp.Event); err != nil {
+		if err := writeEvent(w, resp.Event, h.timer); err != nil {
 			// Should never be reached
-			log.Println("error writing event: %v", err)
+			log.Printf("error writing event: %v", err)
 		}
 	case resp.Watcher != nil:
 		ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
 		defer cancel()
-		handleWatch(ctx, w, resp.Watcher, rr.Stream)
+		handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
 	default:
 		writeError(w, errors.New("received response with no Event/Watcher!"))
 	}
@@ -325,12 +327,14 @@ func writeError(w http.ResponseWriter, err error) {
 // writeEvent serializes a single Event and writes the resulting
 // JSON to the given ResponseWriter, along with the appropriate
 // headers
-func writeEvent(w http.ResponseWriter, ev *store.Event) error {
+func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
 	if ev == nil {
 		return errors.New("cannot write empty Event!")
 	}
 	w.Header().Set("Content-Type", "application/json")
-	w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
+	w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
+	w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
+	w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
 
 	if ev.IsCreated() {
 		w.WriteHeader(http.StatusCreated)
@@ -339,7 +343,7 @@ func writeEvent(w http.ResponseWriter, ev *store.Event) error {
 	return json.NewEncoder(w).Encode(ev)
 }
 
-func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool) {
+func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
 	defer wa.Remove()
 	ech := wa.EventChan()
 	var nch <-chan bool
@@ -348,6 +352,8 @@ func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, s
 	}
 
 	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
+	w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
 	w.WriteHeader(http.StatusOK)
 
 	// Ensure headers are flushed early, in case of long polling

+ 28 - 7
etcdserver/etcdhttp/http_test.go

@@ -499,10 +499,15 @@ func TestWriteError(t *testing.T) {
 	}
 }
 
+type dummyRaftTimer struct{}
+
+func (drt dummyRaftTimer) Index() int64 { return int64(100) }
+func (drt dummyRaftTimer) Term() int64  { return int64(5) }
+
 func TestWriteEvent(t *testing.T) {
 	// nil event should not panic
 	rw := httptest.NewRecorder()
-	writeEvent(rw, nil)
+	writeEvent(rw, nil, dummyRaftTimer{})
 	h := rw.Header()
 	if len(h) > 0 {
 		t.Fatalf("unexpected non-empty headers: %#v", h)
@@ -545,10 +550,16 @@ func TestWriteEvent(t *testing.T) {
 
 	for i, tt := range tests {
 		rw := httptest.NewRecorder()
-		writeEvent(rw, tt.ev)
+		writeEvent(rw, tt.ev, dummyRaftTimer{})
 		if gct := rw.Header().Get("Content-Type"); gct != "application/json" {
 			t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct)
 		}
+		if gri := rw.Header().Get("X-Raft-Index"); gri != "100" {
+			t.Errorf("case %d: bad X-Raft-Index header: got %s, want %s", i, gri, "100")
+		}
+		if grt := rw.Header().Get("X-Raft-Term"); grt != "5" {
+			t.Errorf("case %d: bad X-Raft-Term header: got %s, want %s", i, grt, "5")
+		}
 		if gei := rw.Header().Get("X-Etcd-Index"); gei != tt.idx {
 			t.Errorf("case %d: bad X-Etcd-Index header: got %s, want %s", i, gei, tt.idx)
 		}
@@ -970,6 +981,7 @@ func TestServeKeysEvent(t *testing.T) {
 		timeout: time.Hour,
 		server:  server,
 		peers:   nil,
+		timer:   &dummyRaftTimer{},
 	}
 	rw := httptest.NewRecorder()
 
@@ -1008,6 +1020,7 @@ func TestServeKeysWatch(t *testing.T) {
 		timeout: time.Hour,
 		server:  server,
 		peers:   nil,
+		timer:   &dummyRaftTimer{},
 	}
 	go func() {
 		ec <- &store.Event{
@@ -1047,10 +1060,12 @@ func TestHandleWatch(t *testing.T) {
 		Node:   &store.NodeExtern{},
 	}
 
-	handleWatch(context.Background(), rw, wa, false)
+	handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
 
 	wcode := http.StatusOK
 	wct := "application/json"
+	wri := "100"
+	wrt := "5"
 	wbody := mustMarshalEvent(
 		t,
 		&store.Event{
@@ -1066,6 +1081,12 @@ func TestHandleWatch(t *testing.T) {
 	if ct := h.Get("Content-Type"); ct != wct {
 		t.Errorf("Content-Type=%q, want %q", ct, wct)
 	}
+	if ri := h.Get("X-Raft-Index"); ri != wri {
+		t.Errorf("X-Raft-Index=%q, want %q", ri, wri)
+	}
+	if rt := h.Get("X-Raft-Term"); rt != wrt {
+		t.Errorf("X-Raft-Term=%q, want %q", rt, wrt)
+	}
 	g := rw.Body.String()
 	if g != wbody {
 		t.Errorf("got body=%#v, want %#v", g, wbody)
@@ -1079,7 +1100,7 @@ func TestHandleWatchNoEvent(t *testing.T) {
 	}
 	close(wa.echan)
 
-	handleWatch(context.Background(), rw, wa, false)
+	handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
 
 	wcode := http.StatusOK
 	wct := "application/json"
@@ -1115,7 +1136,7 @@ func TestHandleWatchCloseNotified(t *testing.T) {
 	rw.cn <- true
 	wa := &dummyWatcher{}
 
-	handleWatch(context.Background(), rw, wa, false)
+	handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
 
 	wcode := http.StatusOK
 	wct := "application/json"
@@ -1141,7 +1162,7 @@ func TestHandleWatchTimeout(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	cancel()
 
-	handleWatch(ctx, rw, wa, false)
+	handleWatch(ctx, rw, wa, false, dummyRaftTimer{})
 
 	wcode := http.StatusOK
 	wct := "application/json"
@@ -1184,7 +1205,7 @@ func TestHandleWatchStreaming(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	done := make(chan struct{})
 	go func() {
-		handleWatch(ctx, rw, wa, true)
+		handleWatch(ctx, rw, wa, true, dummyRaftTimer{})
 		close(done)
 	}()
 

+ 21 - 0
etcdserver/server.go

@@ -4,6 +4,7 @@ import (
 	"errors"
 	"log"
 	"math/rand"
+	"sync/atomic"
 	"time"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -66,6 +67,11 @@ type Server interface {
 	Process(ctx context.Context, m raftpb.Message) error
 }
 
+type RaftTimer interface {
+	Index() int64
+	Term() int64
+}
+
 // EtcdServer is the production implementation of the Server interface
 type EtcdServer struct {
 	w    wait.Wait
@@ -86,6 +92,10 @@ type EtcdServer struct {
 	SyncTicker <-chan time.Time
 
 	SnapCount int64 // number of entries to trigger a snapshot
+
+	// Cache of the latest raft index and raft term the server has seen
+	raftIndex int64
+	raftTerm  int64
 }
 
 // Start prepares and starts server in a new goroutine. It is no longer safe to
@@ -138,6 +148,8 @@ func (s *EtcdServer) run() {
 				default:
 					panic("unexpected entry type")
 				}
+				atomic.StoreInt64(&s.raftIndex, e.Index)
+				atomic.StoreInt64(&s.raftTerm, e.Term)
 				appliedi = e.Index
 			}
 
@@ -249,6 +261,15 @@ func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
 	return s.configure(ctx, cc)
 }
 
+// Implement the RaftTimer interface
+func (s *EtcdServer) Index() int64 {
+	return atomic.LoadInt64(&s.raftIndex)
+}
+
+func (s *EtcdServer) Term() int64 {
+	return atomic.LoadInt64(&s.raftTerm)
+}
+
 // configure sends configuration change through consensus then performs it.
 // It will block until the change is performed or there is an error.
 func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {