|
@@ -20,10 +20,12 @@ import (
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"io"
|
|
"io"
|
|
|
"log"
|
|
"log"
|
|
|
|
|
+ "math"
|
|
|
"net/http"
|
|
"net/http"
|
|
|
"net/url"
|
|
"net/url"
|
|
|
"path"
|
|
"path"
|
|
|
"strconv"
|
|
"strconv"
|
|
|
|
|
+ "sync"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/coreos/etcd/etcdserver/stats"
|
|
"github.com/coreos/etcd/etcdserver/stats"
|
|
@@ -37,11 +39,105 @@ const (
|
|
|
streamBufSize = 4096
|
|
streamBufSize = 4096
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+// TODO: a stream might hava one stream server or one stream client, but not both.
|
|
|
|
|
+type stream struct {
|
|
|
|
|
+ // the server might be attached asynchronously with the owner of the stream
|
|
|
|
|
+ // use a mutex to protect it
|
|
|
|
|
+ sync.Mutex
|
|
|
|
|
+ server *streamServer
|
|
|
|
|
+
|
|
|
|
|
+ client *streamClient
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *stream) open(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) error {
|
|
|
|
|
+ if s.client != nil {
|
|
|
|
|
+ panic("open: stream is open")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ c, err := newStreamClient(id, to, cid, term, tr, u, r)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ log.Printf("stream: error opening stream: %v", err)
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ s.client = c
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *stream) attach(server *streamServer) error {
|
|
|
|
|
+ s.Lock()
|
|
|
|
|
+ defer s.Unlock()
|
|
|
|
|
+ if s.server != nil {
|
|
|
|
|
+ // ignore lower-term streaming request
|
|
|
|
|
+ if server.term < s.server.term {
|
|
|
|
|
+ return fmt.Errorf("cannot attach out of data stream server [%d / %d]", server.term, s.server.term)
|
|
|
|
|
+ }
|
|
|
|
|
+ s.server.stop()
|
|
|
|
|
+ }
|
|
|
|
|
+ s.server = server
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *stream) write(m raftpb.Message) bool {
|
|
|
|
|
+ s.Lock()
|
|
|
|
|
+ defer s.Unlock()
|
|
|
|
|
+ if s.server == nil {
|
|
|
|
|
+ return false
|
|
|
|
|
+ }
|
|
|
|
|
+ if m.Term != s.server.term {
|
|
|
|
|
+ if m.Term > s.server.term {
|
|
|
|
|
+ panic("expected server to be invalidated when there is a higher term message")
|
|
|
|
|
+ }
|
|
|
|
|
+ return false
|
|
|
|
|
+ }
|
|
|
|
|
+ // todo: early unlock?
|
|
|
|
|
+ if err := s.server.send(m.Entries); err != nil {
|
|
|
|
|
+ log.Printf("stream: error sending message: %v", err)
|
|
|
|
|
+ log.Printf("stream: stopping the stream server...")
|
|
|
|
|
+ s.server.stop()
|
|
|
|
|
+ s.server = nil
|
|
|
|
|
+ return false
|
|
|
|
|
+ }
|
|
|
|
|
+ return true
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// invalidate stops the sever/client that is running at
|
|
|
|
|
+// a term lower than the given term.
|
|
|
|
|
+func (s *stream) invalidate(term uint64) {
|
|
|
|
|
+ s.Lock()
|
|
|
|
|
+ defer s.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ if s.server != nil {
|
|
|
|
|
+ if s.server.term < term {
|
|
|
|
|
+ s.server.stop()
|
|
|
|
|
+ s.server = nil
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if s.client != nil {
|
|
|
|
|
+ if s.client.term < term {
|
|
|
|
|
+ s.client.stop()
|
|
|
|
|
+ s.client = nil
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *stream) stop() {
|
|
|
|
|
+ s.invalidate(math.MaxUint64)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *stream) isOpen() bool {
|
|
|
|
|
+ if s.client != nil && s.client.isStopped() {
|
|
|
|
|
+ s.client = nil
|
|
|
|
|
+ }
|
|
|
|
|
+ return s.client != nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
type WriteFlusher interface {
|
|
type WriteFlusher interface {
|
|
|
io.Writer
|
|
io.Writer
|
|
|
http.Flusher
|
|
http.Flusher
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// TODO: rename it to streamWriter.
|
|
|
|
|
+// TODO: replace fs with stream stats
|
|
|
type streamServer struct {
|
|
type streamServer struct {
|
|
|
to types.ID
|
|
to types.ID
|
|
|
term uint64
|
|
term uint64
|
|
@@ -50,16 +146,16 @@ type streamServer struct {
|
|
|
done chan struct{}
|
|
done chan struct{}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func startStreamServer(w WriteFlusher, to types.ID, term uint64, fs *stats.FollowerStats) *streamServer {
|
|
|
|
|
|
|
+// newStreamServer starts and returns a new started stream server.
|
|
|
|
|
+// The caller should call stop when finished, to shut it down.
|
|
|
|
|
+func newStreamServer(w WriteFlusher, to types.ID, term uint64) *streamServer {
|
|
|
s := &streamServer{
|
|
s := &streamServer{
|
|
|
to: to,
|
|
to: to,
|
|
|
term: term,
|
|
term: term,
|
|
|
- fs: fs,
|
|
|
|
|
q: make(chan []raftpb.Entry, streamBufSize),
|
|
q: make(chan []raftpb.Entry, streamBufSize),
|
|
|
done: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
|
}
|
|
}
|
|
|
go s.handle(w)
|
|
go s.handle(w)
|
|
|
- log.Printf("rafthttp: starting server stream to %s at term %d", to, term)
|
|
|
|
|
return s
|
|
return s
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -78,13 +174,6 @@ func (s *streamServer) send(ents []raftpb.Entry) error {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (s *streamServer) stop() {
|
|
|
|
|
- close(s.q)
|
|
|
|
|
- <-s.done
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (s *streamServer) stopNotify() <-chan struct{} { return s.done }
|
|
|
|
|
-
|
|
|
|
|
func (s *streamServer) handle(w WriteFlusher) {
|
|
func (s *streamServer) handle(w WriteFlusher) {
|
|
|
defer func() {
|
|
defer func() {
|
|
|
close(s.done)
|
|
close(s.done)
|
|
@@ -103,6 +192,15 @@ func (s *streamServer) handle(w WriteFlusher) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (s *streamServer) stop() {
|
|
|
|
|
+ close(s.q)
|
|
|
|
|
+ <-s.done
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *streamServer) stopNotify() <-chan struct{} { return s.done }
|
|
|
|
|
+
|
|
|
|
|
+// TODO: rename it to streamReader.
|
|
|
|
|
+// TODO: move the raft interface out of the reader.
|
|
|
type streamClient struct {
|
|
type streamClient struct {
|
|
|
id types.ID
|
|
id types.ID
|
|
|
to types.ID
|
|
to types.ID
|
|
@@ -113,44 +211,41 @@ type streamClient struct {
|
|
|
done chan struct{}
|
|
done chan struct{}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func newStreamClient(id, to types.ID, term uint64, r Raft) *streamClient {
|
|
|
|
|
- return &streamClient{
|
|
|
|
|
|
|
+// newStreamClient starts and returns a new started stream client.
|
|
|
|
|
+// The caller should call stop when finished, to shut it down.
|
|
|
|
|
+func newStreamClient(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) (*streamClient, error) {
|
|
|
|
|
+ s := &streamClient{
|
|
|
id: id,
|
|
id: id,
|
|
|
to: to,
|
|
to: to,
|
|
|
term: term,
|
|
term: term,
|
|
|
r: r,
|
|
r: r,
|
|
|
done: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
|
}
|
|
}
|
|
|
-}
|
|
|
|
|
|
|
|
|
|
-// Dial dials to the remote url, and sends streaming request. If it succeeds,
|
|
|
|
|
-// it returns nil error, and the caller should call Handle function to keep
|
|
|
|
|
-// receiving appendEntry messages.
|
|
|
|
|
-func (s *streamClient) start(tr http.RoundTripper, u string, cid types.ID) error {
|
|
|
|
|
uu, err := url.Parse(u)
|
|
uu, err := url.Parse(u)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return fmt.Errorf("parse url %s error: %v", u, err)
|
|
|
|
|
|
|
+ return nil, fmt.Errorf("parse url %s error: %v", u, err)
|
|
|
}
|
|
}
|
|
|
uu.Path = path.Join(RaftStreamPrefix, s.id.String())
|
|
uu.Path = path.Join(RaftStreamPrefix, s.id.String())
|
|
|
req, err := http.NewRequest("GET", uu.String(), nil)
|
|
req, err := http.NewRequest("GET", uu.String(), nil)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return fmt.Errorf("new request to %s error: %v", u, err)
|
|
|
|
|
|
|
+ return nil, fmt.Errorf("new request to %s error: %v", u, err)
|
|
|
}
|
|
}
|
|
|
req.Header.Set("X-Etcd-Cluster-ID", cid.String())
|
|
req.Header.Set("X-Etcd-Cluster-ID", cid.String())
|
|
|
req.Header.Set("X-Raft-To", s.to.String())
|
|
req.Header.Set("X-Raft-To", s.to.String())
|
|
|
req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10))
|
|
req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10))
|
|
|
resp, err := tr.RoundTrip(req)
|
|
resp, err := tr.RoundTrip(req)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return fmt.Errorf("error posting to %q: %v", u, err)
|
|
|
|
|
|
|
+ return nil, fmt.Errorf("error posting to %q: %v", u, err)
|
|
|
}
|
|
}
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
resp.Body.Close()
|
|
resp.Body.Close()
|
|
|
- return fmt.Errorf("unhandled http status %d", resp.StatusCode)
|
|
|
|
|
|
|
+ return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
|
|
|
}
|
|
}
|
|
|
s.closer = resp.Body
|
|
s.closer = resp.Body
|
|
|
go s.handle(resp.Body)
|
|
go s.handle(resp.Body)
|
|
|
log.Printf("rafthttp: starting client stream to %s at term %d", s.to, s.term)
|
|
log.Printf("rafthttp: starting client stream to %s at term %d", s.to, s.term)
|
|
|
- return nil
|
|
|
|
|
|
|
+ return s, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *streamClient) stop() {
|
|
func (s *streamClient) stop() {
|