|
|
@@ -56,23 +56,41 @@ func (s *Session) handleNewNode(host net.IP, port int) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- if s.hostFilter.Accept(hostInfo) {
|
|
|
- s.pool.addHost(hostInfo)
|
|
|
+ // should this handle token moving?
|
|
|
+ if !s.ring.addHostIfMissing(hostInfo) {
|
|
|
+ s.handleNodeUp(host, port)
|
|
|
+ return
|
|
|
}
|
|
|
+
|
|
|
+ s.pool.addHost(hostInfo)
|
|
|
}
|
|
|
|
|
|
-func (s *Session) handleRemovedNode(host net.IP, port int) {
|
|
|
+func (s *Session) handleRemovedNode(ip net.IP, port int) {
|
|
|
// we remove all nodes but only add ones which pass the filter
|
|
|
- s.pool.removeHost(host.String())
|
|
|
+ addr := ip.String()
|
|
|
+ s.pool.removeHost(addr)
|
|
|
+ s.ring.removeHost(addr)
|
|
|
}
|
|
|
|
|
|
-func (s *Session) handleNodeUp(host net.IP, port int) {
|
|
|
- // TODO(zariel): handle this case even when not discovering, just mark the
|
|
|
- // host up.
|
|
|
- // TODO: implement this properly not as newNode
|
|
|
- s.handleNewNode(host, port)
|
|
|
+func (s *Session) handleNodeUp(ip net.IP, port int) {
|
|
|
+ addr := ip.String()
|
|
|
+ host := s.ring.getHost(addr)
|
|
|
+ if host != nil {
|
|
|
+ host.setState(NodeUp)
|
|
|
+ s.pool.hostUp(host)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: this could infinite loop
|
|
|
+ s.handleNewNode(ip, port)
|
|
|
}
|
|
|
|
|
|
-func (s *Session) handleNodeDown(host net.IP, port int) {
|
|
|
- s.pool.hostDown(host.String())
|
|
|
+func (s *Session) handleNodeDown(ip net.IP, port int) {
|
|
|
+ addr := ip.String()
|
|
|
+ host := s.ring.getHost(addr)
|
|
|
+ if host != nil {
|
|
|
+ host.setState(NodeDown)
|
|
|
+ }
|
|
|
+
|
|
|
+ s.pool.hostDown(addr)
|
|
|
}
|