123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package rafthttp
- import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "net/http"
- "net/http/httptest"
- "net/url"
- "strings"
- "testing"
- "time"
- "github.com/coreos/etcd/pkg/pbutil"
- "github.com/coreos/etcd/pkg/types"
- "github.com/coreos/etcd/raft/raftpb"
- "github.com/coreos/etcd/snap"
- "github.com/coreos/etcd/version"
- )
- func TestServeRaftPrefix(t *testing.T) {
- testCases := []struct {
- method string
- body io.Reader
- p Raft
- clusterID string
- wcode int
- }{
- {
- // bad method
- "GET",
- bytes.NewReader(
- pbutil.MustMarshal(&raftpb.Message{}),
- ),
- &fakeRaft{},
- "0",
- http.StatusMethodNotAllowed,
- },
- {
- // bad method
- "PUT",
- bytes.NewReader(
- pbutil.MustMarshal(&raftpb.Message{}),
- ),
- &fakeRaft{},
- "0",
- http.StatusMethodNotAllowed,
- },
- {
- // bad method
- "DELETE",
- bytes.NewReader(
- pbutil.MustMarshal(&raftpb.Message{}),
- ),
- &fakeRaft{},
- "0",
- http.StatusMethodNotAllowed,
- },
- {
- // bad request body
- "POST",
- &errReader{},
- &fakeRaft{},
- "0",
- http.StatusBadRequest,
- },
- {
- // bad request protobuf
- "POST",
- strings.NewReader("malformed garbage"),
- &fakeRaft{},
- "0",
- http.StatusBadRequest,
- },
- {
- // good request, wrong cluster ID
- "POST",
- bytes.NewReader(
- pbutil.MustMarshal(&raftpb.Message{}),
- ),
- &fakeRaft{},
- "1",
- http.StatusPreconditionFailed,
- },
- {
- // good request, Processor failure
- "POST",
- bytes.NewReader(
- pbutil.MustMarshal(&raftpb.Message{}),
- ),
- &fakeRaft{
- err: &resWriterToError{code: http.StatusForbidden},
- },
- "0",
- http.StatusForbidden,
- },
- {
- // good request, Processor failure
- "POST",
- bytes.NewReader(
- pbutil.MustMarshal(&raftpb.Message{}),
- ),
- &fakeRaft{
- err: &resWriterToError{code: http.StatusInternalServerError},
- },
- "0",
- http.StatusInternalServerError,
- },
- {
- // good request, Processor failure
- "POST",
- bytes.NewReader(
- pbutil.MustMarshal(&raftpb.Message{}),
- ),
- &fakeRaft{err: errors.New("blah")},
- "0",
- http.StatusInternalServerError,
- },
- {
- // good request
- "POST",
- bytes.NewReader(
- pbutil.MustMarshal(&raftpb.Message{}),
- ),
- &fakeRaft{},
- "0",
- http.StatusNoContent,
- },
- }
- for i, tt := range testCases {
- req, err := http.NewRequest(tt.method, "foo", tt.body)
- if err != nil {
- t.Fatalf("#%d: could not create request: %#v", i, err)
- }
- req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
- req.Header.Set("X-Server-Version", version.Version)
- rw := httptest.NewRecorder()
- h := newPipelineHandler(NewNopTransporter(), tt.p, types.ID(0))
- h.ServeHTTP(rw, req)
- if rw.Code != tt.wcode {
- t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
- }
- }
- }
- func TestServeRaftStreamPrefix(t *testing.T) {
- tests := []struct {
- path string
- wtype streamType
- }{
- {
- RaftStreamPrefix + "/message/1",
- streamTypeMessage,
- },
- {
- RaftStreamPrefix + "/msgapp/1",
- streamTypeMsgAppV2,
- },
- }
- for i, tt := range tests {
- req, err := http.NewRequest("GET", "http://localhost:2380"+tt.path, nil)
- if err != nil {
- t.Fatalf("#%d: could not create request: %#v", i, err)
- }
- req.Header.Set("X-Etcd-Cluster-ID", "1")
- req.Header.Set("X-Server-Version", version.Version)
- req.Header.Set("X-Raft-To", "2")
- peer := newFakePeer()
- peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}}
- tr := &Transport{}
- h := newStreamHandler(tr, peerGetter, &fakeRaft{}, types.ID(2), types.ID(1))
- rw := httptest.NewRecorder()
- go h.ServeHTTP(rw, req)
- var conn *outgoingConn
- select {
- case conn = <-peer.connc:
- case <-time.After(time.Second):
- t.Fatalf("#%d: failed to attach outgoingConn", i)
- }
- if g := rw.Header().Get("X-Server-Version"); g != version.Version {
- t.Errorf("#%d: X-Server-Version = %s, want %s", i, g, version.Version)
- }
- if conn.t != tt.wtype {
- t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype)
- }
- conn.Close()
- }
- }
- func TestServeRaftStreamPrefixBad(t *testing.T) {
- removedID := uint64(5)
- tests := []struct {
- method string
- path string
- clusterID string
- remote string
- wcode int
- }{
- // bad method
- {
- "PUT",
- RaftStreamPrefix + "/message/1",
- "1",
- "1",
- http.StatusMethodNotAllowed,
- },
- // bad method
- {
- "POST",
- RaftStreamPrefix + "/message/1",
- "1",
- "1",
- http.StatusMethodNotAllowed,
- },
- // bad method
- {
- "DELETE",
- RaftStreamPrefix + "/message/1",
- "1",
- "1",
- http.StatusMethodNotAllowed,
- },
- // bad path
- {
- "GET",
- RaftStreamPrefix + "/strange/1",
- "1",
- "1",
- http.StatusNotFound,
- },
- // bad path
- {
- "GET",
- RaftStreamPrefix + "/strange",
- "1",
- "1",
- http.StatusNotFound,
- },
- // non-existent peer
- {
- "GET",
- RaftStreamPrefix + "/message/2",
- "1",
- "1",
- http.StatusNotFound,
- },
- // removed peer
- {
- "GET",
- RaftStreamPrefix + "/message/" + fmt.Sprint(removedID),
- "1",
- "1",
- http.StatusGone,
- },
- // wrong cluster ID
- {
- "GET",
- RaftStreamPrefix + "/message/1",
- "2",
- "1",
- http.StatusPreconditionFailed,
- },
- // wrong remote id
- {
- "GET",
- RaftStreamPrefix + "/message/1",
- "1",
- "2",
- http.StatusPreconditionFailed,
- },
- }
- for i, tt := range tests {
- req, err := http.NewRequest(tt.method, "http://localhost:2380"+tt.path, nil)
- if err != nil {
- t.Fatalf("#%d: could not create request: %#v", i, err)
- }
- req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
- req.Header.Set("X-Server-Version", version.Version)
- req.Header.Set("X-Raft-To", tt.remote)
- rw := httptest.NewRecorder()
- tr := &Transport{}
- peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): newFakePeer()}}
- r := &fakeRaft{removedID: removedID}
- h := newStreamHandler(tr, peerGetter, r, types.ID(1), types.ID(1))
- h.ServeHTTP(rw, req)
- if rw.Code != tt.wcode {
- t.Errorf("#%d: code = %d, want %d", i, rw.Code, tt.wcode)
- }
- }
- }
- func TestCloseNotifier(t *testing.T) {
- c := newCloseNotifier()
- select {
- case <-c.closeNotify():
- t.Fatalf("received unexpected close notification")
- default:
- }
- c.Close()
- select {
- case <-c.closeNotify():
- default:
- t.Fatalf("failed to get close notification")
- }
- }
- // errReader implements io.Reader to facilitate a broken request.
- type errReader struct{}
- func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") }
- type resWriterToError struct {
- code int
- }
- func (e *resWriterToError) Error() string { return "" }
- func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) }
- type fakePeerGetter struct {
- peers map[types.ID]Peer
- }
- func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
- type fakePeer struct {
- msgs []raftpb.Message
- snapMsgs []snap.Message
- peerURLs types.URLs
- connc chan *outgoingConn
- }
- func newFakePeer() *fakePeer {
- fakeURL, _ := url.Parse("http://localhost")
- return &fakePeer{
- connc: make(chan *outgoingConn, 1),
- peerURLs: types.URLs{*fakeURL},
- }
- }
- func (pr *fakePeer) send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) }
- func (pr *fakePeer) sendSnap(m snap.Message) { pr.snapMsgs = append(pr.snapMsgs, m) }
- func (pr *fakePeer) update(urls types.URLs) { pr.peerURLs = urls }
- func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
- func (pr *fakePeer) activeSince() time.Time { return time.Time{} }
- func (pr *fakePeer) stop() {}
|