瀏覽代碼

working Get

Brad Fitzpatrick 14 年之前
父節點
當前提交
dfd34e9ac2
共有 3 個文件被更改,包括 154 次插入43 次删除
  1. 123 36
      memcache/memcache.go
  2. 14 1
      memcache/memcache_test.go
  3. 17 6
      memcache/selector.go

+ 123 - 36
memcache/memcache.go

@@ -21,13 +21,14 @@ import (
 	"bufio"
 	"bytes"
 	"fmt"
+	"io"
+	"io/ioutil"
 	"log"
 	"os"
 	"net"
 )
 
 var _ = log.Printf
-var _ = fmt.Printf
 
 // Similar to:
 // http://code.google.com/appengine/docs/go/memcache/reference.html
@@ -51,26 +52,60 @@ var (
 
 	// ErrNoStats means that no statistics were available.
 	ErrNoStats = os.NewError("memcache: no statistics available")
+
+	// ErrMalformedKey is returned when an invalid key is used.
+	// Keys must be at maximum 250 bytes long, ASCII, and not
+	// contain whitespace or control characters.
+	ErrMalformedKey = os.NewError("malformed: key is too long or contains invalid characters")
+
+	// ErrNoServers is returned when no servers are configured or available.
+	ErrNoServers = os.NewError("memcache: no servers configured or available")
 )
 
-// cacheError returns true if err is only a protocol-level cache error.
+// resumableError returns true if err is only a protocol-level cache error.
 // This is used to determine whether or not a server connection should
 // be re-used or not. If an error occurs, by default we don't reuse the
 // connection, unless it was just a cache error.
-func cacheError(err os.Error) bool {
+func resumableError(err os.Error) bool {
 	switch err {
-	case ErrCacheMiss, ErrCASConflict, ErrNotStored:
+	case ErrCacheMiss, ErrCASConflict, ErrNotStored, ErrMalformedKey:
 		return true
 	}
 	return false
 }
 
+func legalKey(key string) bool {
+	if len(key) > 250 {
+		return false
+	}
+	for i := 0; i < len(key); i++ {
+		if key[i] <= ' ' || key[i] > 0x7e {
+			return false
+		}
+	}
+	return true
+}
+
+var (
+	crlf            = []byte("\r\n")
+	resultStored    = []byte("STORED\r\n")
+	resultNotStored = []byte("NOT_STORED\r\n")
+	resultExists    = []byte("EXISTS\r\n")
+	resultNotFound  = []byte("NOT_FOUND\r\n")
+	end             = []byte("END\r\n")
+)
+
 // New returns a memcache client using the provided server(s)
 // with equal weight. If a server is listed multiple times,
 // it gets a proportional amount of weight.
 func New(server ...string) *Client {
-	ss := new(StaticServerSelector)
+	ss := new(ServerList)
 	ss.SetServers(server...)
+	return NewFromSelector(ss)
+}
+
+// NewFromSelector returns a new Client using the provided ServerSelector.
+func NewFromSelector(ss ServerSelector) *Client {
 	return &Client{selector: ss}
 }
 
@@ -112,12 +147,17 @@ type conn struct {
 	c    *Client
 }
 
+// release returns this connection back to the client's free pool
 func (cn *conn) release() {
 	// TODO: return to client's free pool
 }
 
+// condRelease releases this connection if the error pointed to by err
+// is is nil (not an error) or is only a protocol level error (e.g. a
+// cache miss).  The purpose is to not recycle TCP connections that
+// are bad.
 func (cn *conn) condRelease(err *os.Error) {
-	if *err == nil || cacheError(*err) {
+	if *err == nil || resumableError(*err) {
 		cn.release()
 	}
 }
@@ -156,6 +196,9 @@ func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) o
 // Get gets the item for the given key. ErrCacheMiss is returned for a
 // memcache cache miss. The key must be at most 250 bytes in length.
 func (c *Client) Get(key string) (item *Item, err os.Error) {
+	if !legalKey(key) {
+		return nil, ErrMalformedKey
+	}
 	addr, err := c.selector.PickServer(key)
 	if err != nil {
 		return nil, err
@@ -166,7 +209,62 @@ func (c *Client) Get(key string) (item *Item, err os.Error) {
 	}
 	defer cn.condRelease(&err)
 
-	return nil, ErrCacheMiss
+	if _, err = fmt.Fprintf(cn.rw, "gets %s\r\n", key); err != nil {
+		return
+	}
+	if err = cn.rw.Flush(); err != nil {
+		return
+	}
+	if err = parseGetResponse(cn.rw.Reader, func(it *Item) { item = it }); err != nil {
+		return
+	}
+	if item == nil {
+		err = ErrCacheMiss
+	}
+	return
+}
+
+// parseGetResponse reads a GET response from r and calls cb for each
+// read and allocated Item
+func parseGetResponse(r *bufio.Reader, cb func(*Item)) os.Error {
+	for {
+		line, err := r.ReadSlice('\n')
+		if err != nil {
+			return err
+		}
+		if bytes.Equal(line, end) {
+			return nil
+		}
+		it := new(Item)
+		var size int
+		n, err := fmt.Sscanf(string(line), "VALUE %s %d %d %d\r\n",
+			&it.Key, &it.Flags, &size, &it.casid)
+		if err != nil {
+			return err
+		}
+		if n != 4 {
+			return fmt.Errorf("memcache: unexpected line in get response: %q", string(line))
+		}
+		it.Value, err = ioutil.ReadAll(io.LimitReader(r, int64(size)+2))
+		if err != nil {
+			return err
+		}
+		if !bytes.HasSuffix(it.Value, crlf) {
+			return fmt.Errorf("memcache: corrupt get result read")
+		}
+		it.Value = it.Value[:size]
+		cb(it)
+	}
+	panic("unreached")
+}
+
+// Set writes the given item, unconditionally.
+func (c *Client) Set(item *Item) os.Error {
+	return c.onItem(item, (*Client).set)
+}
+
+func (c *Client) set(rw *bufio.ReadWriter, item *Item) os.Error {
+	return c.populateOne(rw, "set", item)
 }
 
 // Add writes the given item, if no value already exists for its
@@ -179,13 +277,26 @@ func (c *Client) add(rw *bufio.ReadWriter, item *Item) os.Error {
 	return c.populateOne(rw, "add", item)
 }
 
-var crlf = []byte("\r\n")
-var resultStored = []byte("STORED\r\n")
-var resultNotStored = []byte("NOT_STORED\r\n")
-var resultExists = []byte("EXISTS\r\n")
-var resultNotFound = []byte("NOT_FOUND\r\n")
+// CompareAndSwap writes the given item that was previously returned
+// by Get, if the value was neither modified or evicted between the
+// Get and the CompareAndSwap calls. The item's Key should not change
+// between calls but all other item fields may differ. ErrCASConflict
+// is returned if the value was modified in between the
+// calls. ErrNotStored is returned if the value was evicted in between
+// the calls.
+func (c *Client) CompareAndSwap(item *Item) os.Error {
+	return c.onItem(item, (*Client).cas)
+}
+
+func (c *Client) cas(rw *bufio.ReadWriter, item *Item) os.Error {
+	return c.populateOne(rw, "cas", item)
+}
+
 
 func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) os.Error {
+	if !legalKey(item.Key) {
+		return ErrMalformedKey
+	}
 	var err os.Error
 	if verb == "cas" {
 		_, err = fmt.Fprintf(rw, "%s %s %d %d %d %d %d\r\n",
@@ -222,27 +333,3 @@ func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) os.E
 	}
 	return fmt.Errorf("memcache: unexpected response line from %q: %q", verb, string(line))
 }
-
-// Set writes the given item, unconditionally.
-func (c *Client) Set(item *Item) os.Error {
-	return c.onItem(item, (*Client).set)
-}
-
-func (c *Client) set(rw *bufio.ReadWriter, item *Item) os.Error {
-	return c.populateOne(rw, "set", item)
-}
-
-// CompareAndSwap writes the given item that was previously returned
-// by Get, if the value was neither modified or evicted between the
-// Get and the CompareAndSwap calls. The item's Key should not change
-// between calls but all other item fields may differ. ErrCASConflict
-// is returned if the value was modified in between the
-// calls. ErrNotStored is returned if the value was evicted in between
-// the calls.
-func (c *Client) CompareAndSwap(item *Item) os.Error {
-	return c.onItem(item, (*Client).cas)
-}
-
-func (c *Client) cas(rw *bufio.ReadWriter, item *Item) os.Error {
-	return nil
-}

+ 14 - 1
memcache/memcache_test.go

@@ -41,13 +41,26 @@ func TestMemcache(t *testing.T) {
 	}
 	c := New(testServer)
 
-	foo := &Item{Key: "foo", Value: []byte("bar")}
+	foo := &Item{Key: "foo", Value: []byte("bar"), Flags: 123}
 	if err := c.Set(foo); err != nil {
 		t.Fatalf("first set(foo): %v", err)
 	}
 	if err := c.Set(foo); err != nil {
 		t.Fatalf("second set(foo): %v", err)
 	}
+	it, err := c.Get("foo")
+	if err != nil {
+		t.Fatalf("get(foo): %v", err)
+	}
+	if it.Key != "foo" {
+		t.Errorf("get(foo) Key = %q, want foo", it.Key)
+	}
+	if string(it.Value) != "bar" {
+		t.Errorf("get(foo) Value = %q, want bar", string(it.Value))
+	}
+	if it.Flags != 123 {
+		t.Errorf("get(foo) Flags = %v, want 123", it.Flags)
+	}
 
 	bar := &Item{Key: "bar", Value: []byte("bar2")}
 	if err := c.Add(bar); err != nil {

+ 17 - 6
memcache/selector.go

@@ -14,7 +14,6 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-// Package memcache provides a client for the memcached cache server.
 package memcache
 
 import (
@@ -24,18 +23,32 @@ import (
 	"sync"
 )
 
+// ServerSelector is the interface that selects a memcache server
+// as a function of the item's key.
+//
+// All ServerSelector implementations must be threadsafe.
 type ServerSelector interface {
 	// PickServer returns the server address that a given item
 	// should be shared onto.
 	PickServer(key string) (net.Addr, os.Error)
 }
 
-type StaticServerSelector struct {
+// ServerList is a simple ServerSelector. Its zero value is usable.
+type ServerList struct {
 	lk    sync.RWMutex
 	addrs []net.Addr
 }
 
-func (ss *StaticServerSelector) SetServers(servers ...string) os.Error {
+// SetServers changes a ServerList's set of servers at runtime and is
+// threadsafe.
+//
+// Each server is given equal weight. A server is given more weight
+// if it's listed multiple times.
+//
+// SetServers returns an error if any of the server names fail to
+// resolve. No attempt is made to connect to the server. If any error
+// is returned, no changes are made to the ServerList.
+func (ss *ServerList) SetServers(servers ...string) os.Error {
 	naddr := make([]net.Addr, len(servers))
 	for i, server := range servers {
 		tcpaddr, err := net.ResolveTCPAddr("tcp", server)
@@ -51,9 +64,7 @@ func (ss *StaticServerSelector) SetServers(servers ...string) os.Error {
 	return nil
 }
 
-var ErrNoServers = os.NewError("memcache: no servers configured or available")
-
-func (ss *StaticServerSelector) PickServer(key string) (net.Addr, os.Error) {
+func (ss *ServerList) PickServer(key string) (net.Addr, os.Error) {
 	ss.lk.RLock()
 	defer ss.lk.RUnlock()
 	if len(ss.addrs) == 0 {