浏览代码

Merge pull request #12 from craigmj/master

Touch Support
Brad Fitzpatrick 11 年之前
父节点
当前提交
0f301323b9
共有 3 个文件被更改,包括 134 次插入7 次删除
  1. 66 1
      memcache/memcache.go
  2. 55 6
      memcache/memcache_test.go
  3. 13 0
      memcache/selector.go

+ 66 - 1
memcache/memcache.go

@@ -105,6 +105,8 @@ var (
 	resultNotFound  = []byte("NOT_FOUND\r\n")
 	resultDeleted   = []byte("DELETED\r\n")
 	resultEnd       = []byte("END\r\n")
+	resultOk        = []byte("OK\r\n")
+	resultTouched   = []byte("TOUCHED\r\n")
 
 	resultClientErrorPrefix = []byte("CLIENT_ERROR ")
 )
@@ -178,7 +180,7 @@ func (cn *conn) extendDeadline() {
 }
 
 // condRelease releases this connection if the error pointed to by err
-// is is nil (not an error) or is only a protocol level error (e.g. a
+// is nil (not an error) or is only a protocol level error (e.g. a
 // cache miss).  The purpose is to not recycle TCP connections that
 // are bad.
 func (cn *conn) condRelease(err *error) {
@@ -298,6 +300,10 @@ func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) e
 	return nil
 }
 
+func (c *Client) FlushAll() error {
+	return c.selector.Each(c.flushAllFromAddr)
+}
+
 // Get gets the item for the given key. ErrCacheMiss is returned for a
 // memcache cache miss. The key must be at most 250 bytes in length.
 func (c *Client) Get(key string) (item *Item, err error) {
@@ -310,6 +316,16 @@ func (c *Client) Get(key string) (item *Item, err error) {
 	return
 }
 
+// Touch updates the expiry for the given key. The seconds parameter is either
+// a Unix timestamp or, if seconds is less than 1 month, the number of seconds
+// into the future at which time the item will expire. ErrCacheMiss is returned if the
+// key is not in the cache. The key must be at most 250 bytes in length.
+func (c *Client) Touch(key string, seconds int32) (err error) {
+	return c.withKeyAddr(key, func(addr net.Addr) error {
+		return c.touchFromAddr(addr, []string{key}, seconds)
+	})
+}
+
 func (c *Client) withKeyAddr(key string, fn func(net.Addr) error) (err error) {
 	if !legalKey(key) {
 		return ErrMalformedKey
@@ -351,6 +367,55 @@ func (c *Client) getFromAddr(addr net.Addr, keys []string, cb func(*Item)) error
 	})
 }
 
+// flushAllFromAddr send the flush_all command to the given addr
+func (c *Client) flushAllFromAddr(addr net.Addr) error {
+	return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
+		if _, err := fmt.Fprintf(rw, "flush_all\r\n"); err != nil {
+			return err
+		}
+		if err := rw.Flush(); err != nil {
+			return err
+		}
+		line, err := rw.ReadSlice('\n')
+		if err != nil {
+			return err
+		}
+		switch {
+		case bytes.Equal(line, resultOk):
+			break
+		default:
+			return fmt.Errorf("memcache: unexpected response line from flush_all: %q", string(line))
+		}
+		return nil
+	})
+}
+
+func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error {
+	return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
+		for _, key := range keys {
+			if _, err := fmt.Fprintf(rw, "touch %s %d\r\n", key, expiration); err != nil {
+				return err
+			}
+			if err := rw.Flush(); err != nil {
+				return err
+			}
+			line, err := rw.ReadSlice('\n')
+			if err != nil {
+				return err
+			}
+			switch {
+			case bytes.Equal(line, resultTouched):
+				break
+			case bytes.Equal(line, resultNotFound):
+				return ErrCacheMiss
+			default:
+				return fmt.Errorf("memcache: unexpected response line from touch: %q", string(line))
+			}
+		}
+		return nil
+	})
+}
+
 // GetMulti is a batch version of Get. The returned map from keys to
 // items may have fewer elements than the input slice, due to memcache
 // cache misses. Each key must be at most 250 bytes in length.

+ 55 - 6
memcache/memcache_test.go

@@ -68,18 +68,21 @@ func TestUnixSocket(t *testing.T) {
 	testWithClient(t, New(sock))
 }
 
+func mustSetF(t *testing.T, c *Client) func(*Item) {
+	return func(it *Item) {
+		if err := c.Set(it); err != nil {
+			t.Fatalf("failed to Set %#v: %v", *it, err)
+		}
+	}
+}
+
 func testWithClient(t *testing.T, c *Client) {
 	checkErr := func(err error, format string, args ...interface{}) {
 		if err != nil {
 			t.Fatalf(format, args...)
 		}
 	}
-
-	mustSet := func(it *Item) {
-		if err := c.Set(it); err != nil {
-			t.Fatalf("failed to Set %#v: %v", *it, err)
-		}
-	}
+	mustSet := mustSetF(t, c) 
 
 	// Set
 	foo := &Item{Key: "foo", Value: []byte("fooval"), Flags: 123}
@@ -159,5 +162,51 @@ func testWithClient(t *testing.T, c *Client) {
 	if err == nil || !strings.Contains(err.Error(), "client error") {
 		t.Fatalf("increment non-number: want client error, got %v", err)
 	}
+	testTouchWithClient(t, c)
+}
 
+func testTouchWithClient(t *testing.T, c *Client) {
+	if testing.Short() {
+		t.Skip("Skipping testing memcache Touch with testing in Short mode")
+	}
+	
+	mustSet := mustSetF(t, c)
+	
+	const secondsToExpiry = int32(2)
+	
+	// We will set foo and bar to expire in 2 seconds, then we'll keep touching
+	// foo every second
+	// After 3 seconds, we expect foo to be available, and bar to be expired
+	foo := &Item{Key: "foo", Value: []byte("fooval"), Expiration: secondsToExpiry }
+	bar := &Item{Key: "bar", Value: []byte("barval"), Expiration: secondsToExpiry }
+	
+	setTime := time.Now()
+	mustSet(foo)
+	mustSet(bar)
+	
+	for s:=0; s<3; s++ {
+		time.Sleep(time.Duration(1*time.Second))
+		err := c.Touch(foo.Key, secondsToExpiry)
+		if nil!=err {
+			t.Errorf("error touching foo: %v", err.Error())
+		}
+	}
+	
+	_, err := c.Get("foo")
+	if err != nil {
+		if err == ErrCacheMiss {
+			t.Fatalf("touching failed to keep item foo alive")
+		} else {
+			t.Fatalf("unexpected error retrieving foo after touching: %v", err.Error())
+		}
+	}
+	
+	_, err = c.Get("bar")
+	if nil==err {
+		t.Fatalf("item bar did not expire within %v seconds", time.Now().Sub(setTime).Seconds())
+	} else {
+		if err != ErrCacheMiss {
+			t.Fatalf("unexpected error retrieving bar: %v", err.Error())
+		}
+	}
 }

+ 13 - 0
memcache/selector.go

@@ -31,6 +31,7 @@ type ServerSelector interface {
 	// PickServer returns the server address that a given item
 	// should be shared onto.
 	PickServer(key string) (net.Addr, error)
+	Each(func(net.Addr) error) error
 }
 
 // ServerList is a simple ServerSelector. Its zero value is usable.
@@ -72,6 +73,18 @@ func (ss *ServerList) SetServers(servers ...string) error {
 	return nil
 }
 
+// Each iterates over each server calling the given function
+func (ss *ServerList) Each(f func(net.Addr) error) error {
+	ss.lk.RLock()
+	defer ss.lk.RUnlock()
+	for _, a := range ss.addrs {
+		if err := f(a); nil != err {
+			return err
+		}
+	}
+	return nil
+}
+
 func (ss *ServerList) PickServer(key string) (net.Addr, error) {
 	ss.lk.RLock()
 	defer ss.lk.RUnlock()