Browse Source

etcdserver: wip

Blake Mizerany 11 years ago
parent
commit
008337e150
1 changed files with 34 additions and 8 deletions
  1. 34 8
      etcdserver2/server.go

+ 34 - 8
etcdserver2/server.go

@@ -1,6 +1,8 @@
 package etcdserver
 package etcdserver
 
 
 import (
 import (
+	"log"
+
 	"code.google.com/p/go.net/context"
 	"code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/wait"
 	"github.com/coreos/etcd/wait"
@@ -13,22 +15,24 @@ type Response struct {
 type Server struct {
 type Server struct {
 	n raft.Node
 	n raft.Node
 	w wait.List
 	w wait.List
+
+	msgsc chan raft.Message
 }
 }
 
 
 func (s *Server) Run(ctx context.Context) {
 func (s *Server) Run(ctx context.Context) {
 	for {
 	for {
 		st, ents, cents, msgs, err := s.n.ReadState(ctx)
 		st, ents, cents, msgs, err := s.n.ReadState(ctx)
 		if err != nil {
 		if err != nil {
-			do something here
+			log.Println("etcdserver: error while reading state -", err)
+			return
 		}
 		}
-		save state to wal
-		go send messages
+		s.save(st, ents)
+		s.send(msgs)
 		go func() {
 		go func() {
-			for e in cents {
-				req = decode e.Data
-				apply req to state machine
-				build Response from result of apply
-				trigger wait with (r.Id, resp)
+			for _, e := range cents {
+				var r Request
+				r.Unmarshal(e.Data)
+				s.w.Trigger(r.Id, s.apply(r))
 			}
 			}
 		}()
 		}()
 	}
 	}
@@ -53,3 +57,25 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
 		return Response{}, ctx.Err()
 		return Response{}, ctx.Err()
 	}
 	}
 }
 }
+
+// send sends dispatches msgs to the sending goroutine. If the goroutine is
+// busy, it will drop msgs and clients should timeout and reissue.
+// TODO: we could use s.w to trigger and error to cancel the clients faster???? Is this a good idea??
+func (s *Server) send(msgs []raft.Message) {
+	for _, m := range msgs {
+		select {
+		case s.msgsc <- m:
+		default:
+			log.Println("TODO: log dropped message")
+		}
+	}
+}
+
+func (s *Server) save(st raft.State, ents []raft.Entry) {
+	panic("not implemented")
+}
+
+// apply interprets r as a call to store.X and returns an Response interpreted from store.Event
+func (s *Server) apply(r Request) Response {
+	panic("not implmented")
+}