123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package rafthttp
- import (
- "net/http"
- "reflect"
- "testing"
- "time"
- stats "go.etcd.io/etcd/etcdserver/api/v2stats"
- "go.etcd.io/etcd/pkg/testutil"
- "go.etcd.io/etcd/pkg/types"
- "go.etcd.io/etcd/raft/raftpb"
- "github.com/xiang90/probing"
- )
- // TestTransportSend tests that transport can send messages using correct
- // underlying peer, and drop local or unknown-target messages.
- func TestTransportSend(t *testing.T) {
- peer1 := newFakePeer()
- peer2 := newFakePeer()
- tr := &Transport{
- ServerStats: stats.NewServerStats("", ""),
- peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
- }
- wmsgsIgnored := []raftpb.Message{
- // bad local message
- {Type: raftpb.MsgBeat},
- // bad remote message
- {Type: raftpb.MsgProp, To: 3},
- }
- wmsgsTo1 := []raftpb.Message{
- // good message
- {Type: raftpb.MsgProp, To: 1},
- {Type: raftpb.MsgApp, To: 1},
- }
- wmsgsTo2 := []raftpb.Message{
- // good message
- {Type: raftpb.MsgProp, To: 2},
- {Type: raftpb.MsgApp, To: 2},
- }
- tr.Send(wmsgsIgnored)
- tr.Send(wmsgsTo1)
- tr.Send(wmsgsTo2)
- if !reflect.DeepEqual(peer1.msgs, wmsgsTo1) {
- t.Errorf("msgs to peer 1 = %+v, want %+v", peer1.msgs, wmsgsTo1)
- }
- if !reflect.DeepEqual(peer2.msgs, wmsgsTo2) {
- t.Errorf("msgs to peer 2 = %+v, want %+v", peer2.msgs, wmsgsTo2)
- }
- }
- func TestTransportCutMend(t *testing.T) {
- peer1 := newFakePeer()
- peer2 := newFakePeer()
- tr := &Transport{
- ServerStats: stats.NewServerStats("", ""),
- peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
- }
- tr.CutPeer(types.ID(1))
- wmsgsTo := []raftpb.Message{
- // good message
- {Type: raftpb.MsgProp, To: 1},
- {Type: raftpb.MsgApp, To: 1},
- }
- tr.Send(wmsgsTo)
- if len(peer1.msgs) > 0 {
- t.Fatalf("msgs expected to be ignored, got %+v", peer1.msgs)
- }
- tr.MendPeer(types.ID(1))
- tr.Send(wmsgsTo)
- if !reflect.DeepEqual(peer1.msgs, wmsgsTo) {
- t.Errorf("msgs to peer 1 = %+v, want %+v", peer1.msgs, wmsgsTo)
- }
- }
- func TestTransportAdd(t *testing.T) {
- ls := stats.NewLeaderStats("")
- tr := &Transport{
- LeaderStats: ls,
- streamRt: &roundTripperRecorder{},
- peers: make(map[types.ID]Peer),
- pipelineProber: probing.NewProber(nil),
- streamProber: probing.NewProber(nil),
- }
- tr.AddPeer(1, []string{"http://localhost:2380"})
- if _, ok := ls.Followers["1"]; !ok {
- t.Errorf("FollowerStats[1] is nil, want exists")
- }
- s, ok := tr.peers[types.ID(1)]
- if !ok {
- tr.Stop()
- t.Fatalf("senders[1] is nil, want exists")
- }
- // duplicate AddPeer is ignored
- tr.AddPeer(1, []string{"http://localhost:2380"})
- ns := tr.peers[types.ID(1)]
- if s != ns {
- t.Errorf("sender = %v, want %v", ns, s)
- }
- tr.Stop()
- }
- func TestTransportRemove(t *testing.T) {
- tr := &Transport{
- LeaderStats: stats.NewLeaderStats(""),
- streamRt: &roundTripperRecorder{},
- peers: make(map[types.ID]Peer),
- pipelineProber: probing.NewProber(nil),
- streamProber: probing.NewProber(nil),
- }
- tr.AddPeer(1, []string{"http://localhost:2380"})
- tr.RemovePeer(types.ID(1))
- defer tr.Stop()
- if _, ok := tr.peers[types.ID(1)]; ok {
- t.Fatalf("senders[1] exists, want removed")
- }
- }
- func TestTransportUpdate(t *testing.T) {
- peer := newFakePeer()
- tr := &Transport{
- peers: map[types.ID]Peer{types.ID(1): peer},
- pipelineProber: probing.NewProber(nil),
- streamProber: probing.NewProber(nil),
- }
- u := "http://localhost:2380"
- tr.UpdatePeer(types.ID(1), []string{u})
- wurls := types.URLs(testutil.MustNewURLs(t, []string{"http://localhost:2380"}))
- if !reflect.DeepEqual(peer.peerURLs, wurls) {
- t.Errorf("urls = %+v, want %+v", peer.peerURLs, wurls)
- }
- }
- func TestTransportErrorc(t *testing.T) {
- errorc := make(chan error, 1)
- tr := &Transport{
- Raft: &fakeRaft{},
- LeaderStats: stats.NewLeaderStats(""),
- ErrorC: errorc,
- streamRt: newRespRoundTripper(http.StatusForbidden, nil),
- pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
- peers: make(map[types.ID]Peer),
- pipelineProber: probing.NewProber(nil),
- streamProber: probing.NewProber(nil),
- }
- tr.AddPeer(1, []string{"http://localhost:2380"})
- defer tr.Stop()
- select {
- case <-errorc:
- t.Fatalf("received unexpected from errorc")
- case <-time.After(10 * time.Millisecond):
- }
- tr.peers[1].send(raftpb.Message{})
- select {
- case <-errorc:
- case <-time.After(1 * time.Second):
- t.Fatalf("cannot receive error from errorc")
- }
- }
|