transport.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package rafthttp
  2. import (
  3. "net/http"
  4. "github.com/coreos/etcd/etcdserver/stats"
  5. "github.com/coreos/etcd/pkg/types"
  6. "github.com/coreos/etcd/raft/raftpb"
  7. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  8. )
  9. type Processor interface {
  10. Process(ctx context.Context, m raftpb.Message) error
  11. }
  12. type Transporter interface {
  13. Handler() http.Handler
  14. Send(m []raftpb.Message)
  15. AddPeer(id types.ID, urls []string)
  16. RemovePeer(id types.ID)
  17. UpdatePeer(id types.ID, urls []string)
  18. Stop()
  19. ShouldStopNotify() <-chan struct{}
  20. }
  21. type Transport struct {
  22. RoundTripper http.RoundTripper
  23. ID types.ID
  24. ClusterID types.ID
  25. Processor Processor
  26. ServerStats *stats.ServerStats
  27. LeaderStats *stats.LeaderStats
  28. *sendHub
  29. handler http.Handler
  30. }
  31. func (t *Transport) Start() {
  32. t.sendHub = newSendHub(t.RoundTripper, t.ClusterID, t.Processor, t.ServerStats, t.LeaderStats)
  33. h := NewHandler(t.Processor, t.ClusterID)
  34. sh := NewStreamHandler(t.sendHub, t.ID, t.ClusterID)
  35. mux := http.NewServeMux()
  36. mux.Handle(RaftPrefix, h)
  37. mux.Handle(RaftStreamPrefix+"/", sh)
  38. t.handler = mux
  39. }
  40. func (t *Transport) Handler() http.Handler { return t.handler }