Browse Source

refactor move sync command into etcd

Xiang Li 12 years ago
parent
commit
0372cdea23

+ 14 - 0
server/peer_server.go

@@ -136,6 +136,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
 		log.Debugf("%s restart as a follower", s.name)
 	}
 
+	go s.monitorSync()
+
 	// open the snapshot
 	if snapshot {
 		go s.monitorSnapshot()
@@ -424,3 +426,15 @@ func (s *PeerServer) monitorSnapshot() {
 		}
 	}
 }
+
+func (s *PeerServer) monitorSync() {
+	ticker := time.Tick(time.Millisecond * 500)
+	for {
+		select {
+		case now := <-ticker:
+			if s.raftServer.State() == raft.Leader {
+				s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now))
+			}
+		}
+	}
+}

+ 7 - 11
server/v2/get_handler.go

@@ -55,6 +55,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 		cn, _ := w.(http.CloseNotifier)
 		closeChan := cn.CloseNotify()
 
+	eventLoop:
 		for {
 			select {
 			case <-closeChan:
@@ -66,7 +67,8 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 				if event != nil && event.Action == store.Expire {
 					events = append(events, event)
 				} else {
-					goto finish
+					events = append(events, event)
+					break eventLoop
 				}
 			}
 		}
@@ -79,19 +81,13 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 		}
 	}
 
-finish:
+	var b []byte
 
-	w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
-	w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
+	w.Header().Add("X-Etcd-Index", fmt.Sprint(events[0].Index))
+	w.Header().Add("X-Etcd-Term", fmt.Sprint(events[0].Term))
 	w.WriteHeader(http.StatusOK)
+	b, _ = json.Marshal(events)
 
-	var b []byte
-
-	if len(events) == 0 {
-		b, _ = json.Marshal(event)
-	} else {
-		b, _ = json.Marshal(events)
-	}
 	w.Write(b)
 
 	return nil

+ 1 - 0
store/command_factory.go

@@ -21,6 +21,7 @@ type CommandFactory interface {
 	CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command
 	CreateDeleteCommand(key string, recursive bool) raft.Command
 	CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command
+	CreateSyncCommand(now time.Time) raft.Command
 }
 
 // RegisterCommandFactory adds a command factory to the global registry.

+ 4 - 7
store/event_history.go

@@ -51,10 +51,8 @@ func (eh *EventHistory) scan(prefix string, index uint64) ([]*Event, *etcdErr.Er
 	eh.rwl.RLock()
 	defer eh.rwl.RUnlock()
 
-	start := index - eh.StartIndex
-
 	// the index should locate after the event history's StartIndex
-	if start < 0 {
+	if index-eh.StartIndex < 0 {
 		return nil,
 			etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
 				fmt.Sprintf("the requested history has been cleared [%v/%v]",
@@ -62,11 +60,11 @@ func (eh *EventHistory) scan(prefix string, index uint64) ([]*Event, *etcdErr.Er
 	}
 
 	// the index should locate before the size of the queue minus the duplicate count
-	if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index
+	if index > eh.LastIndex { // future index
 		return nil, nil
 	}
 
-	i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
+	i := eh.Queue.Front
 
 	events := make([]*Event, 0)
 	var eventIndex uint64
@@ -85,11 +83,10 @@ func (eh *EventHistory) scan(prefix string, index uint64) ([]*Event, *etcdErr.Er
 
 		i = (i + 1) % eh.Queue.Capacity
 
-		if i == eh.Queue.back() {
+		if i > eh.Queue.back() {
 			if eventIndex == 0 { // find nothing, return and watch from current index
 				return nil, nil
 			}
-
 			return events, nil
 		}
 	}

+ 2 - 1
store/store.go

@@ -32,6 +32,7 @@ type Store interface {
 	Recovery(state []byte) error
 	TotalTransactions() uint64
 	JsonStats() []byte
+	DeleteExpiredKeys(cutoff time.Time, index uint64, term uint64)
 }
 
 type store struct {
@@ -435,7 +436,7 @@ func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node,
 }
 
 // deleteExpiredKyes will delete all
-func (s *store) deleteExpiredKeys(cutoff time.Time, index uint64, term uint64) {
+func (s *store) DeleteExpiredKeys(cutoff time.Time, index uint64, term uint64) {
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 

+ 13 - 9
store/v2/command_factory.go

@@ -2,7 +2,7 @@ package v2
 
 import (
 	"time"
-	
+
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
 )
@@ -28,8 +28,8 @@ func (f *CommandFactory) CreateUpgradeCommand() raft.Command {
 // CreateSetCommand creates a version 2 command to set a key to a given value in the store.
 func (f *CommandFactory) CreateSetCommand(key string, value string, expireTime time.Time) raft.Command {
 	return &SetCommand{
-		Key: key,
-		Value: value,
+		Key:        key,
+		Value:      value,
 		ExpireTime: expireTime,
 	}
 }
@@ -37,18 +37,18 @@ func (f *CommandFactory) CreateSetCommand(key string, value string, expireTime t
 // CreateCreateCommand creates a version 2 command to create a new key in the store.
 func (f *CommandFactory) CreateCreateCommand(key string, value string, expireTime time.Time, unique bool) raft.Command {
 	return &CreateCommand{
-		Key: key,
-		Value: value,
+		Key:        key,
+		Value:      value,
 		ExpireTime: expireTime,
-		Unique: unique,
+		Unique:     unique,
 	}
 }
 
 // CreateUpdateCommand creates a version 2 command to update a key to a given value in the store.
 func (f *CommandFactory) CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command {
 	return &UpdateCommand{
-		Key: key,
-		Value: value,
+		Key:        key,
+		Value:      value,
 		ExpireTime: expireTime,
 	}
 }
@@ -56,7 +56,7 @@ func (f *CommandFactory) CreateUpdateCommand(key string, value string, expireTim
 // CreateDeleteCommand creates a version 2 command to delete a key from the store.
 func (f *CommandFactory) CreateDeleteCommand(key string, recursive bool) raft.Command {
 	return &DeleteCommand{
-		Key: key,
+		Key:       key,
 		Recursive: recursive,
 	}
 }
@@ -71,3 +71,7 @@ func (f *CommandFactory) CreateCompareAndSwapCommand(key string, value string, p
 		ExpireTime: expireTime,
 	}
 }
+
+func (f *CommandFactory) CreateSyncCommand(now time.Time) raft.Command {
+	return &SyncCommand{time.Now()}
+}

+ 1 - 1
store/v2/delete_command.go

@@ -1,8 +1,8 @@
 package v2
 
 import (
-	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
 )
 

+ 1 - 1
store/watcher_hub.go

@@ -54,7 +54,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan
 			eventChan <- e
 		}
 
-		if len(events) > 1 {
+		if events[0].Action == Expire {
 			eventChan <- nil
 		}