123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package rafthttp
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "net/http"
- "net/http/httptest"
- "reflect"
- "sync"
- "testing"
- "time"
- "golang.org/x/time/rate"
- "github.com/coreos/etcd/etcdserver/stats"
- "github.com/coreos/etcd/pkg/testutil"
- "github.com/coreos/etcd/pkg/types"
- "github.com/coreos/etcd/raft/raftpb"
- "github.com/coreos/etcd/version"
- "github.com/coreos/go-semver/semver"
- )
- // TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached
- // to streamWriter. After that, streamWriter can use it to send messages
- // continuously, and closes it when stopped.
- func TestStreamWriterAttachOutgoingConn(t *testing.T) {
- sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
- // the expected initial state of streamWriter is not working
- if _, ok := sw.writec(); ok {
- t.Errorf("initial working status = %v, want false", ok)
- }
- // repeat tests to ensure streamWriter can use last attached connection
- var wfc *fakeWriteFlushCloser
- for i := 0; i < 3; i++ {
- prevwfc := wfc
- wfc = newFakeWriteFlushCloser(nil)
- sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
- // previous attached connection should be closed
- if prevwfc != nil {
- select {
- case <-prevwfc.closed:
- case <-time.After(time.Second):
- t.Errorf("#%d: close of previous connection timed out", i)
- }
- }
- // if prevwfc != nil, the new msgc is ready since prevwfc has closed
- // if prevwfc == nil, the first connection may be pending, but the first
- // msgc is already available since it's set on calling startStreamwriter
- msgc, _ := sw.writec()
- msgc <- raftpb.Message{}
- select {
- case <-wfc.writec:
- case <-time.After(time.Second):
- t.Errorf("#%d: failed to write to the underlying connection", i)
- }
- // write chan is still available
- if _, ok := sw.writec(); !ok {
- t.Errorf("#%d: working status = %v, want true", i, ok)
- }
- }
- sw.stop()
- // write chan is unavailable since the writer is stopped.
- if _, ok := sw.writec(); ok {
- t.Errorf("working status after stop = %v, want false", ok)
- }
- if !wfc.Closed() {
- t.Errorf("failed to close the underlying connection")
- }
- }
- // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
- // outgoingConn will close the outgoingConn and fall back to non-working status.
- func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
- sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
- defer sw.stop()
- wfc := newFakeWriteFlushCloser(errors.New("blah"))
- sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
- sw.msgc <- raftpb.Message{}
- select {
- case <-wfc.closed:
- case <-time.After(time.Second):
- t.Errorf("failed to close the underlying connection in time")
- }
- // no longer working
- if _, ok := sw.writec(); ok {
- t.Errorf("working = %v, want false", ok)
- }
- }
- func TestStreamReaderDialRequest(t *testing.T) {
- for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
- tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}}
- sr := &streamReader{
- peerID: types.ID(2),
- tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
- picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
- ctx: context.Background(),
- }
- sr.dial(tt)
- act, err := tr.rec.Wait(1)
- if err != nil {
- t.Fatal(err)
- }
- req := act[0].Params[0].(*http.Request)
- wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint() + "/1")
- if req.URL.String() != wurl {
- t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl)
- }
- if w := "GET"; req.Method != w {
- t.Errorf("#%d: method = %s, want %s", i, req.Method, w)
- }
- if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" {
- t.Errorf("#%d: header X-Etcd-Cluster-ID = %s, want 1", i, g)
- }
- if g := req.Header.Get("X-Raft-To"); g != "2" {
- t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g)
- }
- }
- }
- // TestStreamReaderDialResult tests the result of the dial func call meets the
- // HTTP response received.
- func TestStreamReaderDialResult(t *testing.T) {
- tests := []struct {
- code int
- err error
- wok bool
- whalt bool
- }{
- {0, errors.New("blah"), false, false},
- {http.StatusOK, nil, true, false},
- {http.StatusMethodNotAllowed, nil, false, false},
- {http.StatusNotFound, nil, false, false},
- {http.StatusPreconditionFailed, nil, false, false},
- {http.StatusGone, nil, false, true},
- }
- for i, tt := range tests {
- h := http.Header{}
- h.Add("X-Server-Version", version.Version)
- tr := &respRoundTripper{
- code: tt.code,
- header: h,
- err: tt.err,
- }
- sr := &streamReader{
- peerID: types.ID(2),
- tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
- picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
- errorc: make(chan error, 1),
- ctx: context.Background(),
- }
- _, err := sr.dial(streamTypeMessage)
- if ok := err == nil; ok != tt.wok {
- t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
- }
- if halt := len(sr.errorc) > 0; halt != tt.whalt {
- t.Errorf("#%d: halt = %v, want %v", i, halt, tt.whalt)
- }
- }
- }
- // TestStreamReaderStopOnDial tests a stream reader closes the connection on stop.
- func TestStreamReaderStopOnDial(t *testing.T) {
- defer testutil.AfterTest(t)
- h := http.Header{}
- h.Add("X-Server-Version", version.Version)
- tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
- sr := &streamReader{
- peerID: types.ID(2),
- tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
- picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
- errorc: make(chan error, 1),
- typ: streamTypeMessage,
- status: newPeerStatus(types.ID(2)),
- rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
- }
- tr.onResp = func() {
- // stop() waits for the run() goroutine to exit, but that exit
- // needs a response from RoundTrip() first; use goroutine
- go sr.stop()
- // wait so that stop() is blocked on run() exiting
- time.Sleep(10 * time.Millisecond)
- // sr.run() completes dialing then begins decoding while stopped
- }
- sr.start()
- select {
- case <-sr.done:
- case <-time.After(time.Second):
- t.Fatal("streamReader did not stop in time")
- }
- }
- type respWaitRoundTripper struct {
- rrt *respRoundTripper
- onResp func()
- }
- func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
- resp, err := t.rrt.RoundTrip(req)
- resp.Body = newWaitReadCloser()
- t.onResp()
- return resp, err
- }
- type waitReadCloser struct{ closec chan struct{} }
- func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
- func (wrc *waitReadCloser) Read(p []byte) (int, error) {
- <-wrc.closec
- return 0, io.EOF
- }
- func (wrc *waitReadCloser) Close() error {
- close(wrc.closec)
- return nil
- }
- // TestStreamReaderDialDetectUnsupport tests that dial func could find
- // out that the stream type is not supported by the remote.
- func TestStreamReaderDialDetectUnsupport(t *testing.T) {
- for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} {
- // the response from etcd 2.0
- tr := &respRoundTripper{
- code: http.StatusNotFound,
- header: http.Header{},
- }
- sr := &streamReader{
- peerID: types.ID(2),
- tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
- picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
- ctx: context.Background(),
- }
- _, err := sr.dial(typ)
- if err != errUnsupportedStreamType {
- t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType)
- }
- }
- }
- // TestStream tests that streamReader and streamWriter can build stream to
- // send messages between each other.
- func TestStream(t *testing.T) {
- recvc := make(chan raftpb.Message, streamBufSize)
- propc := make(chan raftpb.Message, streamBufSize)
- msgapp := raftpb.Message{
- Type: raftpb.MsgApp,
- From: 2,
- To: 1,
- Term: 1,
- LogTerm: 1,
- Index: 3,
- Entries: []raftpb.Entry{{Term: 1, Index: 4}},
- }
- tests := []struct {
- t streamType
- m raftpb.Message
- wc chan raftpb.Message
- }{
- {
- streamTypeMessage,
- raftpb.Message{Type: raftpb.MsgProp, To: 2},
- propc,
- },
- {
- streamTypeMessage,
- msgapp,
- recvc,
- },
- {
- streamTypeMsgAppV2,
- msgapp,
- recvc,
- },
- }
- for i, tt := range tests {
- h := &fakeStreamHandler{t: tt.t}
- srv := httptest.NewServer(h)
- defer srv.Close()
- sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
- defer sw.stop()
- h.sw = sw
- picker := mustNewURLPicker(t, []string{srv.URL})
- tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)}
- sr := &streamReader{
- peerID: types.ID(2),
- typ: tt.t,
- tr: tr,
- picker: picker,
- status: newPeerStatus(types.ID(2)),
- recvc: recvc,
- propc: propc,
- rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
- }
- sr.start()
- // wait for stream to work
- var writec chan<- raftpb.Message
- for {
- var ok bool
- if writec, ok = sw.writec(); ok {
- break
- }
- time.Sleep(time.Millisecond)
- }
- writec <- tt.m
- var m raftpb.Message
- select {
- case m = <-tt.wc:
- case <-time.After(time.Second):
- t.Fatalf("#%d: failed to receive message from the channel", i)
- }
- if !reflect.DeepEqual(m, tt.m) {
- t.Fatalf("#%d: message = %+v, want %+v", i, m, tt.m)
- }
- sr.stop()
- }
- }
- func TestCheckStreamSupport(t *testing.T) {
- tests := []struct {
- v *semver.Version
- t streamType
- w bool
- }{
- // support
- {
- semver.Must(semver.NewVersion("2.1.0")),
- streamTypeMsgAppV2,
- true,
- },
- // ignore patch
- {
- semver.Must(semver.NewVersion("2.1.9")),
- streamTypeMsgAppV2,
- true,
- },
- // ignore prerelease
- {
- semver.Must(semver.NewVersion("2.1.0-alpha")),
- streamTypeMsgAppV2,
- true,
- },
- }
- for i, tt := range tests {
- if g := checkStreamSupport(tt.v, tt.t); g != tt.w {
- t.Errorf("#%d: check = %v, want %v", i, g, tt.w)
- }
- }
- }
- type fakeWriteFlushCloser struct {
- mu sync.Mutex
- err error
- written int
- closed chan struct{}
- writec chan struct{}
- }
- func newFakeWriteFlushCloser(err error) *fakeWriteFlushCloser {
- return &fakeWriteFlushCloser{
- err: err,
- closed: make(chan struct{}),
- writec: make(chan struct{}, 1),
- }
- }
- func (wfc *fakeWriteFlushCloser) Write(p []byte) (n int, err error) {
- wfc.mu.Lock()
- defer wfc.mu.Unlock()
- select {
- case wfc.writec <- struct{}{}:
- default:
- }
- wfc.written += len(p)
- return len(p), wfc.err
- }
- func (wfc *fakeWriteFlushCloser) Flush() {}
- func (wfc *fakeWriteFlushCloser) Close() error {
- close(wfc.closed)
- return wfc.err
- }
- func (wfc *fakeWriteFlushCloser) Written() int {
- wfc.mu.Lock()
- defer wfc.mu.Unlock()
- return wfc.written
- }
- func (wfc *fakeWriteFlushCloser) Closed() bool {
- select {
- case <-wfc.closed:
- return true
- default:
- return false
- }
- }
- type fakeStreamHandler struct {
- t streamType
- sw *streamWriter
- }
- func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- w.Header().Add("X-Server-Version", version.Version)
- w.(http.Flusher).Flush()
- c := newCloseNotifier()
- h.sw.attach(&outgoingConn{
- t: h.t,
- Writer: w,
- Flusher: w.(http.Flusher),
- Closer: c,
- })
- <-c.closeNotify()
- }
|