Ver código fonte

Start of a memcache client library.

Brad Fitzpatrick 14 anos atrás
commit
7d98658c98
4 arquivos alterados com 397 adições e 0 exclusões
  1. 24 0
      memcache/Makefile
  2. 248 0
      memcache/memcache.go
  3. 60 0
      memcache/memcache_test.go
  4. 65 0
      memcache/selector.go

+ 24 - 0
memcache/Makefile

@@ -0,0 +1,24 @@
+# Copyright 2011 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+include $(GOROOT)/src/Make.inc
+
+TARG=github.com/bradfitz/go-memcache/memcache
+GOFILES=\
+	memcache.go\
+	selector.go
+
+include $(GOROOT)/src/Make.pkg
+
+

+ 248 - 0
memcache/memcache.go

@@ -0,0 +1,248 @@
+/*
+Copyright 2011 Google Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package memcache provides a client for the memcached cache server.
+package memcache
+
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"log"
+	"os"
+	"net"
+)
+
+var _ = log.Printf
+var _ = fmt.Printf
+
+// Similar to:
+// http://code.google.com/appengine/docs/go/memcache/reference.html
+
+var (
+	// ErrCacheMiss means that a Get failed because the item wasn't present.
+	ErrCacheMiss = os.NewError("memcache: cache miss")
+
+	// ErrCASConflict means that a CompareAndSwap call failed due to the
+	// cached value being modified between the Get and the CompareAndSwap.
+	// If the cached value was simply evicted rather than replaced,
+	// ErrNotStored will be returned instead.
+	ErrCASConflict = os.NewError("memcache: compare-and-swap conflict")
+
+	// ErrNotStored means that a conditional write operation (i.e. Add or
+	// CompareAndSwap) failed because the condition was not satisfied.
+	ErrNotStored = os.NewError("memcache: item not stored")
+
+	// ErrServer means that a server error occurred.
+	ErrServerError = os.NewError("memcache: server error")
+
+	// ErrNoStats means that no statistics were available.
+	ErrNoStats = os.NewError("memcache: no statistics available")
+)
+
+// cacheError returns true if err is only a protocol-level cache error.
+// This is used to determine whether or not a server connection should
+// be re-used or not. If an error occurs, by default we don't reuse the
+// connection, unless it was just a cache error.
+func cacheError(err os.Error) bool {
+	switch err {
+	case ErrCacheMiss, ErrCASConflict, ErrNotStored:
+		return true
+	}
+	return false
+}
+
+// New returns a memcache client using the provided server(s)
+// with equal weight. If a server is listed multiple times,
+// it gets a proportional amount of weight.
+func New(server ...string) *Client {
+	ss := new(StaticServerSelector)
+	ss.SetServers(server...)
+	return &Client{selector: ss}
+}
+
+// Client is a memcache client.
+// It is safe for unlocked use by multiple concurrent goroutines.
+type Client struct {
+	selector ServerSelector
+}
+
+// Item is an item to be got or stored in a memcached server.
+type Item struct {
+	// Key is the Item's key (250 bytes maximum).
+	Key string
+
+	// Value is the Item's value.
+	Value []byte
+
+	// Object is the Item's value for use with a Codec.
+	Object interface{}
+
+	// Flags are server-opaque flags whose semantics are entirely up to the
+	// App Engine app.
+	Flags uint32
+
+	// Expiration is the cache expiration time, in seconds: either a relative
+	// time from now (up to 1 month), or an absolute Unix epoch time.
+	// Zero means the Item has no expiration time.
+	Expiration int32
+
+	// Compare and swap ID.
+	casid uint64
+}
+
+// conn is a connection to a server.
+type conn struct {
+	nc   net.Conn
+	rw   *bufio.ReadWriter
+	addr net.Addr
+	c    *Client
+}
+
+func (cn *conn) release() {
+	// TODO: return to client's free pool
+}
+
+func (cn *conn) condRelease(err *os.Error) {
+	if *err == nil || cacheError(*err) {
+		cn.release()
+	}
+}
+
+func (c *Client) getConn(addr net.Addr) (*conn, os.Error) {
+	// TODO(bradfitz): get from a free pool
+	nc, err := net.Dial(addr.Network(), addr.String())
+	if err != nil {
+		return nil, err
+	}
+	// TODO: set read/write timeouts
+	return &conn{
+		nc:   nc,
+		addr: addr,
+		rw:   bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)),
+		c:    c,
+	}, nil
+}
+
+func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) os.Error) os.Error {
+	addr, err := c.selector.PickServer(item.Key)
+	if err != nil {
+		return err
+	}
+	cn, err := c.getConn(addr)
+	if err != nil {
+		return err
+	}
+	defer cn.condRelease(&err)
+	if err := fn(c, cn.rw, item); err != nil {
+		return err
+	}
+	return nil
+}
+
+// Get gets the item for the given key. ErrCacheMiss is returned for a
+// memcache cache miss. The key must be at most 250 bytes in length.
+func (c *Client) Get(key string) (item *Item, err os.Error) {
+	addr, err := c.selector.PickServer(key)
+	if err != nil {
+		return nil, err
+	}
+	cn, err := c.getConn(addr)
+	if err != nil {
+		return nil, err
+	}
+	defer cn.condRelease(&err)
+
+	return nil, ErrCacheMiss
+}
+
+// Add writes the given item, if no value already exists for its
+// key. ErrNotStored is returned if that condition is not met.
+func (c *Client) Add(item *Item) os.Error {
+	return c.onItem(item, (*Client).add)
+}
+
+func (c *Client) add(rw *bufio.ReadWriter, item *Item) os.Error {
+	return c.populateOne(rw, "add", item)
+}
+
+var crlf = []byte("\r\n")
+var resultStored = []byte("STORED\r\n")
+var resultNotStored = []byte("NOT_STORED\r\n")
+var resultExists = []byte("EXISTS\r\n")
+var resultNotFound = []byte("NOT_FOUND\r\n")
+
+func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) os.Error {
+	var err os.Error
+	if verb == "cas" {
+		_, err = fmt.Fprintf(rw, "%s %s %d %d %d %d %d\r\n",
+			verb, item.Key, item.Flags, item.Expiration, len(item.Value), item.casid)
+	} else {
+		_, err = fmt.Fprintf(rw, "%s %s %d %d %d %d\r\n",
+			verb, item.Key, item.Flags, item.Expiration, len(item.Value))
+	}
+	if err != nil {
+		return err
+	}
+	if _, err = rw.Write(item.Value); err != nil {
+		return err
+	}
+	if _, err := rw.Write(crlf); err != nil {
+		return err
+	}
+	if err := rw.Flush(); err != nil {
+		return err
+	}
+	line, err := rw.ReadSlice('\n')
+	if err != nil {
+		return err
+	}
+	switch {
+	case bytes.Equal(line, resultStored):
+		return nil
+	case bytes.Equal(line, resultNotStored):
+		return ErrNotStored
+	case bytes.Equal(line, resultExists):
+		return ErrCASConflict
+	case bytes.Equal(line, resultNotFound):
+		return ErrCacheMiss
+	}
+	return fmt.Errorf("memcache: unexpected response line from %q: %q", verb, string(line))
+}
+
+// Set writes the given item, unconditionally.
+func (c *Client) Set(item *Item) os.Error {
+	return c.onItem(item, (*Client).set)
+}
+
+func (c *Client) set(rw *bufio.ReadWriter, item *Item) os.Error {
+	return c.populateOne(rw, "set", item)
+}
+
+// CompareAndSwap writes the given item that was previously returned
+// by Get, if the value was neither modified or evicted between the
+// Get and the CompareAndSwap calls. The item's Key should not change
+// between calls but all other item fields may differ. ErrCASConflict
+// is returned if the value was modified in between the
+// calls. ErrNotStored is returned if the value was evicted in between
+// the calls.
+func (c *Client) CompareAndSwap(item *Item) os.Error {
+	return c.onItem(item, (*Client).cas)
+}
+
+func (c *Client) cas(rw *bufio.ReadWriter, item *Item) os.Error {
+	return nil
+}

+ 60 - 0
memcache/memcache_test.go

@@ -0,0 +1,60 @@
+/*
+Copyright 2011 Google Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package memcache provides a client for the memcached cache server.
+package memcache
+
+import (
+	"net"
+	"testing"
+)
+
+const testServer = "localhost:11211"
+
+func setup(t *testing.T) bool {
+	c, err := net.Dial("tcp", testServer)
+	if err != nil {
+		t.Logf("skipping test; no server running at %s", testServer)
+		return false
+	}
+	c.Write([]byte("flush_all\r\n"))
+	c.Close()
+	return true
+}
+
+func TestMemcache(t *testing.T) {
+	if !setup(t) {
+		return
+	}
+	c := New(testServer)
+
+	foo := &Item{Key: "foo", Value: []byte("bar")}
+	if err := c.Set(foo); err != nil {
+		t.Fatalf("first set(foo): %v", err)
+	}
+	if err := c.Set(foo); err != nil {
+		t.Fatalf("second set(foo): %v", err)
+	}
+
+	bar := &Item{Key: "bar", Value: []byte("bar2")}
+	if err := c.Add(bar); err != nil {
+		t.Fatalf("first add(foo): %v", err)
+	}
+	if err := c.Add(bar); err != ErrNotStored {
+		t.Fatalf("second add(foo) want ErrNotStored, got %v", err)
+	}
+
+}

+ 65 - 0
memcache/selector.go

@@ -0,0 +1,65 @@
+/*
+Copyright 2011 Google Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package memcache provides a client for the memcached cache server.
+package memcache
+
+import (
+	"hash/crc32"
+	"os"
+	"net"
+	"sync"
+)
+
+type ServerSelector interface {
+	// PickServer returns the server address that a given item
+	// should be shared onto.
+	PickServer(key string) (net.Addr, os.Error)
+}
+
+type StaticServerSelector struct {
+	lk    sync.RWMutex
+	addrs []net.Addr
+}
+
+func (ss *StaticServerSelector) SetServers(servers ...string) os.Error {
+	naddr := make([]net.Addr, len(servers))
+	for i, server := range servers {
+		tcpaddr, err := net.ResolveTCPAddr("tcp", server)
+		if err != nil {
+			return err
+		}
+		naddr[i] = tcpaddr
+	}
+
+	ss.lk.Lock()
+	defer ss.lk.Unlock()
+	ss.addrs = naddr
+	return nil
+}
+
+var ErrNoServers = os.NewError("memcache: no servers configured or available")
+
+func (ss *StaticServerSelector) PickServer(key string) (net.Addr, os.Error) {
+	ss.lk.RLock()
+	defer ss.lk.RUnlock()
+	if len(ss.addrs) == 0 {
+		return nil, ErrNoServers
+	}
+	// TODO-GO: remove this copy
+	cs := crc32.ChecksumIEEE([]byte(key))
+	return ss.addrs[cs%uint32(len(ss.addrs))], nil
+}