|
@@ -106,12 +106,16 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
|
http.Error(w, "error reading raft message", http.StatusBadRequest)
|
|
http.Error(w, "error reading raft message", http.StatusBadRequest)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
var m raftpb.Message
|
|
var m raftpb.Message
|
|
|
if err := m.Unmarshal(b); err != nil {
|
|
if err := m.Unmarshal(b); err != nil {
|
|
|
plog.Errorf("failed to unmarshal raft message (%v)", err)
|
|
plog.Errorf("failed to unmarshal raft message (%v)", err)
|
|
|
http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
|
|
http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
|
|
|
|
|
+
|
|
|
if err := h.r.Process(context.TODO(), m); err != nil {
|
|
if err := h.r.Process(context.TODO(), m); err != nil {
|
|
|
switch v := err.(type) {
|
|
switch v := err.(type) {
|
|
|
case writerToResponse:
|
|
case writerToResponse:
|
|
@@ -181,6 +185,9 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
|
http.Error(w, msg, http.StatusBadRequest)
|
|
http.Error(w, msg, http.StatusBadRequest)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
|
|
|
|
|
+
|
|
|
if m.Type != raftpb.MsgSnap {
|
|
if m.Type != raftpb.MsgSnap {
|
|
|
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
|
|
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
|
|
|
http.Error(w, "wrong raft message type", http.StatusBadRequest)
|
|
http.Error(w, "wrong raft message type", http.StatusBadRequest)
|
|
@@ -189,13 +196,15 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
|
|
|
plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
|
|
plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
|
|
|
// save incoming database snapshot.
|
|
// save incoming database snapshot.
|
|
|
- if err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index); err != nil {
|
|
|
|
|
|
|
+ if n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index); err != nil {
|
|
|
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
|
|
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
|
|
|
plog.Error(msg)
|
|
plog.Error(msg)
|
|
|
http.Error(w, msg, http.StatusInternalServerError)
|
|
http.Error(w, msg, http.StatusInternalServerError)
|
|
|
return
|
|
return
|
|
|
|
|
+ } else {
|
|
|
|
|
+ receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
|
|
|
|
|
+ plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
|
|
}
|
|
}
|
|
|
- plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
|
|
|
|
|
|
|
|
|
if err := h.r.Process(context.TODO(), m); err != nil {
|
|
if err := h.r.Process(context.TODO(), m); err != nil {
|
|
|
switch v := err.(type) {
|
|
switch v := err.(type) {
|