瀏覽代碼

*: etcdhttp.raftHandler -> rafthttp.RaftHandler

Yicheng Qin 11 年之前
父節點
當前提交
5dc5f8145c
共有 5 個文件被更改,包括 288 次插入211 次删除
  1. 2 59
      etcdserver/etcdhttp/peer.go
  2. 0 150
      etcdserver/etcdhttp/peer_test.go
  3. 3 2
      etcdserver/server.go
  4. 99 0
      rafthttp/http.go
  5. 184 0
      rafthttp/http_test.go

+ 2 - 59
etcdserver/etcdhttp/peer.go

@@ -18,14 +18,11 @@ package etcdhttp
 
 import (
 	"encoding/json"
-	"io/ioutil"
 	"log"
 	"net/http"
 
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/etcdserver"
-	"github.com/coreos/etcd/pkg/types"
-	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
 )
 
 const (
@@ -35,12 +32,7 @@ const (
 
 // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
 func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
-	rh := &raftHandler{
-		stats:       server,
-		server:      server,
-		clusterInfo: server.Cluster,
-	}
-
+	rh := rafthttp.NewHandler(server, server.Cluster.ID(), server)
 	mh := &peerMembersHandler{
 		clusterInfo: server.Cluster,
 	}
@@ -52,55 +44,6 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
 	return mux
 }
 
-type raftHandler struct {
-	stats       etcdserver.Stats
-	server      etcdserver.Server
-	clusterInfo etcdserver.ClusterInfo
-}
-
-func (h *raftHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	if !allowMethod(w, r.Method, "POST") {
-		return
-	}
-
-	wcid := h.clusterInfo.ID().String()
-	w.Header().Set("X-Etcd-Cluster-ID", wcid)
-
-	gcid := r.Header.Get("X-Etcd-Cluster-ID")
-	if gcid != wcid {
-		log.Printf("etcdhttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
-		http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed)
-		return
-	}
-
-	b, err := ioutil.ReadAll(r.Body)
-	if err != nil {
-		log.Println("etcdhttp: error reading raft message:", err)
-		http.Error(w, "error reading raft message", http.StatusBadRequest)
-		return
-	}
-	var m raftpb.Message
-	if err := m.Unmarshal(b); err != nil {
-		log.Println("etcdhttp: error unmarshaling raft message:", err)
-		http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
-		return
-	}
-	if err := h.server.Process(context.TODO(), m); err != nil {
-		switch err {
-		case etcdserver.ErrRemoved:
-			log.Printf("etcdhttp: reject message from removed member %s", types.ID(m.From).String())
-			http.Error(w, "cannot process message from removed member", http.StatusForbidden)
-		default:
-			writeError(w, err)
-		}
-		return
-	}
-	if m.Type == raftpb.MsgApp {
-		h.stats.UpdateRecvApp(types.ID(m.From), r.ContentLength)
-	}
-	w.WriteHeader(http.StatusNoContent)
-}
-
 type peerMembersHandler struct {
 	clusterInfo etcdserver.ClusterInfo
 }

+ 0 - 150
etcdserver/etcdhttp/peer_test.go

@@ -17,165 +17,15 @@
 package etcdhttp
 
 import (
-	"bytes"
 	"encoding/json"
-	"errors"
-	"io"
 	"net/http"
 	"net/http/httptest"
 	"path"
-	"strings"
 	"testing"
 
 	"github.com/coreos/etcd/etcdserver"
-	"github.com/coreos/etcd/raft/raftpb"
 )
 
-func mustMarshalMsg(t *testing.T, m raftpb.Message) []byte {
-	json, err := m.Marshal()
-	if err != nil {
-		t.Fatalf("error marshalling raft Message: %#v", err)
-	}
-	return json
-}
-
-// errReader implements io.Reader to facilitate a broken request.
-type errReader struct{}
-
-func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") }
-
-func TestServeRaft(t *testing.T) {
-	testCases := []struct {
-		method    string
-		body      io.Reader
-		serverErr error
-		clusterID string
-
-		wcode int
-	}{
-		{
-			// bad method
-			"GET",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			nil,
-			"0",
-			http.StatusMethodNotAllowed,
-		},
-		{
-			// bad method
-			"PUT",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			nil,
-			"0",
-			http.StatusMethodNotAllowed,
-		},
-		{
-			// bad method
-			"DELETE",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			nil,
-			"0",
-			http.StatusMethodNotAllowed,
-		},
-		{
-			// bad request body
-			"POST",
-			&errReader{},
-			nil,
-			"0",
-			http.StatusBadRequest,
-		},
-		{
-			// bad request protobuf
-			"POST",
-			strings.NewReader("malformed garbage"),
-			nil,
-			"0",
-			http.StatusBadRequest,
-		},
-		{
-			// good request, etcdserver.Server internal error
-			"POST",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			errors.New("some error"),
-			"0",
-			http.StatusInternalServerError,
-		},
-		{
-			// good request from removed member
-			"POST",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			etcdserver.ErrRemoved,
-			"0",
-			http.StatusForbidden,
-		},
-		{
-			// good request
-			"POST",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			nil,
-			"1",
-			http.StatusPreconditionFailed,
-		},
-		{
-			// good request
-			"POST",
-			bytes.NewReader(
-				mustMarshalMsg(
-					t,
-					raftpb.Message{},
-				),
-			),
-			nil,
-			"0",
-			http.StatusNoContent,
-		},
-	}
-	for i, tt := range testCases {
-		req, err := http.NewRequest(tt.method, "foo", tt.body)
-		if err != nil {
-			t.Fatalf("#%d: could not create request: %#v", i, err)
-		}
-		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
-		rw := httptest.NewRecorder()
-		h := &raftHandler{stats: nil, server: &errServer{tt.serverErr}, clusterInfo: &fakeCluster{id: 0}}
-		h.ServeHTTP(rw, req)
-		if rw.Code != tt.wcode {
-			t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
-		}
-	}
-}
-
 func TestServeMembersFails(t *testing.T) {
 	tests := []struct {
 		method string

+ 3 - 2
etcdserver/server.go

@@ -33,6 +33,7 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/discovery"
+	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/pbutil"
@@ -61,7 +62,6 @@ const (
 var (
 	ErrUnknownMethod = errors.New("etcdserver: unknown method")
 	ErrStopped       = errors.New("etcdserver: server stopped")
-	ErrRemoved       = errors.New("etcdserver: server removed")
 	ErrIDRemoved     = errors.New("etcdserver: ID removed")
 	ErrIDExists      = errors.New("etcdserver: ID exists")
 	ErrIDNotFound    = errors.New("etcdserver: ID not found")
@@ -318,7 +318,8 @@ func (s *EtcdServer) ID() types.ID { return s.id }
 
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 	if s.Cluster.IsIDRemoved(types.ID(m.From)) {
-		return ErrRemoved
+		log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String())
+		return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
 	}
 	return s.node.Step(ctx, m)
 }

+ 99 - 0
rafthttp/http.go

@@ -0,0 +1,99 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"io/ioutil"
+	"log"
+	"net/http"
+
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+type Processor interface {
+	Process(ctx context.Context, m raftpb.Message) error
+}
+
+type Stats interface {
+	UpdateRecvApp(from types.ID, length int64)
+}
+
+func NewHandler(p Processor, cid types.ID, ss Stats) http.Handler {
+	return &handler{
+		p:   p,
+		cid: cid,
+		ss:  ss,
+	}
+}
+
+type handler struct {
+	p   Processor
+	cid types.ID
+	ss  Stats
+}
+
+func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != "POST" {
+		w.Header().Set("Allow", "POST")
+		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		return
+	}
+
+	wcid := h.cid.String()
+	w.Header().Set("X-Etcd-Cluster-ID", wcid)
+
+	gcid := r.Header.Get("X-Etcd-Cluster-ID")
+	if gcid != wcid {
+		log.Printf("rafthttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
+		http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed)
+		return
+	}
+
+	b, err := ioutil.ReadAll(r.Body)
+	if err != nil {
+		log.Println("rafthttp: error reading raft message:", err)
+		http.Error(w, "error reading raft message", http.StatusBadRequest)
+		return
+	}
+	var m raftpb.Message
+	if err := m.Unmarshal(b); err != nil {
+		log.Println("rafthttp: error unmarshaling raft message:", err)
+		http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
+		return
+	}
+	if err := h.p.Process(context.TODO(), m); err != nil {
+		switch v := err.(type) {
+		case writerToResponse:
+			v.WriteTo(w)
+		default:
+			log.Printf("rafthttp: error processing raft message: %v", err)
+			http.Error(w, "error processing raft message", http.StatusInternalServerError)
+		}
+		return
+	}
+	if m.Type == raftpb.MsgApp {
+		h.ss.UpdateRecvApp(types.ID(m.From), r.ContentLength)
+	}
+	w.WriteHeader(http.StatusNoContent)
+}
+
+type writerToResponse interface {
+	WriteTo(w http.ResponseWriter)
+}

+ 184 - 0
rafthttp/http_test.go

@@ -0,0 +1,184 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"bytes"
+	"errors"
+	"io"
+	"net/http"
+	"net/http/httptest"
+	"strings"
+	"testing"
+
+	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+func TestServeRaft(t *testing.T) {
+	testCases := []struct {
+		method    string
+		body      io.Reader
+		p         Processor
+		clusterID string
+
+		wcode int
+	}{
+		{
+			// bad method
+			"GET",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&nopProcessor{},
+			"0",
+			http.StatusMethodNotAllowed,
+		},
+		{
+			// bad method
+			"PUT",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&nopProcessor{},
+			"0",
+			http.StatusMethodNotAllowed,
+		},
+		{
+			// bad method
+			"DELETE",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&nopProcessor{},
+			"0",
+			http.StatusMethodNotAllowed,
+		},
+		{
+			// bad request body
+			"POST",
+			&errReader{},
+			&nopProcessor{},
+			"0",
+			http.StatusBadRequest,
+		},
+		{
+			// bad request protobuf
+			"POST",
+			strings.NewReader("malformed garbage"),
+			&nopProcessor{},
+			"0",
+			http.StatusBadRequest,
+		},
+		{
+			// good request, wrong cluster ID
+			"POST",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&nopProcessor{},
+			"1",
+			http.StatusPreconditionFailed,
+		},
+		{
+			// good request, Processor failure
+			"POST",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&errProcessor{
+				err: &resWriterToError{code: http.StatusForbidden},
+			},
+			"0",
+			http.StatusForbidden,
+		},
+		{
+			// good request, Processor failure
+			"POST",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&errProcessor{
+				err: &resWriterToError{code: http.StatusInternalServerError},
+			},
+			"0",
+			http.StatusInternalServerError,
+		},
+		{
+			// good request, Processor failure
+			"POST",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&errProcessor{err: errors.New("blah")},
+			"0",
+			http.StatusInternalServerError,
+		},
+		{
+			// good request
+			"POST",
+			bytes.NewReader(
+				pbutil.MustMarshal(&raftpb.Message{}),
+			),
+			&nopProcessor{},
+			"0",
+			http.StatusNoContent,
+		},
+	}
+	for i, tt := range testCases {
+		req, err := http.NewRequest(tt.method, "foo", tt.body)
+		if err != nil {
+			t.Fatalf("#%d: could not create request: %#v", i, err)
+		}
+		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
+		rw := httptest.NewRecorder()
+		h := NewHandler(tt.p, types.ID(0), &nopStats{})
+		h.ServeHTTP(rw, req)
+		if rw.Code != tt.wcode {
+			t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
+		}
+	}
+}
+
+// errReader implements io.Reader to facilitate a broken request.
+type errReader struct{}
+
+func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") }
+
+type nopProcessor struct{}
+
+func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil }
+
+type errProcessor struct {
+	err error
+}
+
+func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err }
+
+type nopStats struct{}
+
+func (s *nopStats) UpdateRecvApp(from types.ID, length int64) {}
+
+type resWriterToError struct {
+	code int
+}
+
+func (e *resWriterToError) Error() string                 { return "" }
+func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) }