memcache.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. /*
  2. Copyright 2011 Google Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package memcache provides a client for the memcached cache server.
  14. package memcache
  15. import (
  16. "bufio"
  17. "bytes"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "log"
  22. "os"
  23. "net"
  24. )
  25. var _ = log.Printf
  26. // Similar to:
  27. // http://code.google.com/appengine/docs/go/memcache/reference.html
  28. var (
  29. // ErrCacheMiss means that a Get failed because the item wasn't present.
  30. ErrCacheMiss = os.NewError("memcache: cache miss")
  31. // ErrCASConflict means that a CompareAndSwap call failed due to the
  32. // cached value being modified between the Get and the CompareAndSwap.
  33. // If the cached value was simply evicted rather than replaced,
  34. // ErrNotStored will be returned instead.
  35. ErrCASConflict = os.NewError("memcache: compare-and-swap conflict")
  36. // ErrNotStored means that a conditional write operation (i.e. Add or
  37. // CompareAndSwap) failed because the condition was not satisfied.
  38. ErrNotStored = os.NewError("memcache: item not stored")
  39. // ErrServer means that a server error occurred.
  40. ErrServerError = os.NewError("memcache: server error")
  41. // ErrNoStats means that no statistics were available.
  42. ErrNoStats = os.NewError("memcache: no statistics available")
  43. // ErrMalformedKey is returned when an invalid key is used.
  44. // Keys must be at maximum 250 bytes long, ASCII, and not
  45. // contain whitespace or control characters.
  46. ErrMalformedKey = os.NewError("malformed: key is too long or contains invalid characters")
  47. // ErrNoServers is returned when no servers are configured or available.
  48. ErrNoServers = os.NewError("memcache: no servers configured or available")
  49. )
  50. // resumableError returns true if err is only a protocol-level cache error.
  51. // This is used to determine whether or not a server connection should
  52. // be re-used or not. If an error occurs, by default we don't reuse the
  53. // connection, unless it was just a cache error.
  54. func resumableError(err os.Error) bool {
  55. switch err {
  56. case ErrCacheMiss, ErrCASConflict, ErrNotStored, ErrMalformedKey:
  57. return true
  58. }
  59. return false
  60. }
  61. func legalKey(key string) bool {
  62. if len(key) > 250 {
  63. return false
  64. }
  65. for i := 0; i < len(key); i++ {
  66. if key[i] <= ' ' || key[i] > 0x7e {
  67. return false
  68. }
  69. }
  70. return true
  71. }
  72. var (
  73. crlf = []byte("\r\n")
  74. resultStored = []byte("STORED\r\n")
  75. resultNotStored = []byte("NOT_STORED\r\n")
  76. resultExists = []byte("EXISTS\r\n")
  77. resultNotFound = []byte("NOT_FOUND\r\n")
  78. end = []byte("END\r\n")
  79. )
  80. // New returns a memcache client using the provided server(s)
  81. // with equal weight. If a server is listed multiple times,
  82. // it gets a proportional amount of weight.
  83. func New(server ...string) *Client {
  84. ss := new(ServerList)
  85. ss.SetServers(server...)
  86. return NewFromSelector(ss)
  87. }
  88. // NewFromSelector returns a new Client using the provided ServerSelector.
  89. func NewFromSelector(ss ServerSelector) *Client {
  90. return &Client{selector: ss}
  91. }
  92. // Client is a memcache client.
  93. // It is safe for unlocked use by multiple concurrent goroutines.
  94. type Client struct {
  95. selector ServerSelector
  96. }
  97. // Item is an item to be got or stored in a memcached server.
  98. type Item struct {
  99. // Key is the Item's key (250 bytes maximum).
  100. Key string
  101. // Value is the Item's value.
  102. Value []byte
  103. // Object is the Item's value for use with a Codec.
  104. Object interface{}
  105. // Flags are server-opaque flags whose semantics are entirely up to the
  106. // App Engine app.
  107. Flags uint32
  108. // Expiration is the cache expiration time, in seconds: either a relative
  109. // time from now (up to 1 month), or an absolute Unix epoch time.
  110. // Zero means the Item has no expiration time.
  111. Expiration int32
  112. // Compare and swap ID.
  113. casid uint64
  114. }
  115. // conn is a connection to a server.
  116. type conn struct {
  117. nc net.Conn
  118. rw *bufio.ReadWriter
  119. addr net.Addr
  120. c *Client
  121. }
  122. // release returns this connection back to the client's free pool
  123. func (cn *conn) release() {
  124. // TODO: return to client's free pool
  125. }
  126. // condRelease releases this connection if the error pointed to by err
  127. // is is nil (not an error) or is only a protocol level error (e.g. a
  128. // cache miss). The purpose is to not recycle TCP connections that
  129. // are bad.
  130. func (cn *conn) condRelease(err *os.Error) {
  131. if *err == nil || resumableError(*err) {
  132. cn.release()
  133. }
  134. }
  135. func (c *Client) getConn(addr net.Addr) (*conn, os.Error) {
  136. // TODO(bradfitz): get from a free pool
  137. nc, err := net.Dial(addr.Network(), addr.String())
  138. if err != nil {
  139. return nil, err
  140. }
  141. // TODO: set read/write timeouts
  142. return &conn{
  143. nc: nc,
  144. addr: addr,
  145. rw: bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)),
  146. c: c,
  147. }, nil
  148. }
  149. func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) os.Error) os.Error {
  150. addr, err := c.selector.PickServer(item.Key)
  151. if err != nil {
  152. return err
  153. }
  154. cn, err := c.getConn(addr)
  155. if err != nil {
  156. return err
  157. }
  158. defer cn.condRelease(&err)
  159. if err := fn(c, cn.rw, item); err != nil {
  160. return err
  161. }
  162. return nil
  163. }
  164. // Get gets the item for the given key. ErrCacheMiss is returned for a
  165. // memcache cache miss. The key must be at most 250 bytes in length.
  166. func (c *Client) Get(key string) (item *Item, err os.Error) {
  167. if !legalKey(key) {
  168. return nil, ErrMalformedKey
  169. }
  170. addr, err := c.selector.PickServer(key)
  171. if err != nil {
  172. return nil, err
  173. }
  174. cn, err := c.getConn(addr)
  175. if err != nil {
  176. return nil, err
  177. }
  178. defer cn.condRelease(&err)
  179. if _, err = fmt.Fprintf(cn.rw, "gets %s\r\n", key); err != nil {
  180. return
  181. }
  182. if err = cn.rw.Flush(); err != nil {
  183. return
  184. }
  185. if err = parseGetResponse(cn.rw.Reader, func(it *Item) { item = it }); err != nil {
  186. return
  187. }
  188. if item == nil {
  189. err = ErrCacheMiss
  190. }
  191. return
  192. }
  193. // parseGetResponse reads a GET response from r and calls cb for each
  194. // read and allocated Item
  195. func parseGetResponse(r *bufio.Reader, cb func(*Item)) os.Error {
  196. for {
  197. line, err := r.ReadSlice('\n')
  198. if err != nil {
  199. return err
  200. }
  201. if bytes.Equal(line, end) {
  202. return nil
  203. }
  204. it := new(Item)
  205. var size int
  206. n, err := fmt.Sscanf(string(line), "VALUE %s %d %d %d\r\n",
  207. &it.Key, &it.Flags, &size, &it.casid)
  208. if err != nil {
  209. return err
  210. }
  211. if n != 4 {
  212. return fmt.Errorf("memcache: unexpected line in get response: %q", string(line))
  213. }
  214. it.Value, err = ioutil.ReadAll(io.LimitReader(r, int64(size)+2))
  215. if err != nil {
  216. return err
  217. }
  218. if !bytes.HasSuffix(it.Value, crlf) {
  219. return fmt.Errorf("memcache: corrupt get result read")
  220. }
  221. it.Value = it.Value[:size]
  222. cb(it)
  223. }
  224. panic("unreached")
  225. }
  226. // Set writes the given item, unconditionally.
  227. func (c *Client) Set(item *Item) os.Error {
  228. return c.onItem(item, (*Client).set)
  229. }
  230. func (c *Client) set(rw *bufio.ReadWriter, item *Item) os.Error {
  231. return c.populateOne(rw, "set", item)
  232. }
  233. // Add writes the given item, if no value already exists for its
  234. // key. ErrNotStored is returned if that condition is not met.
  235. func (c *Client) Add(item *Item) os.Error {
  236. return c.onItem(item, (*Client).add)
  237. }
  238. func (c *Client) add(rw *bufio.ReadWriter, item *Item) os.Error {
  239. return c.populateOne(rw, "add", item)
  240. }
  241. // CompareAndSwap writes the given item that was previously returned
  242. // by Get, if the value was neither modified or evicted between the
  243. // Get and the CompareAndSwap calls. The item's Key should not change
  244. // between calls but all other item fields may differ. ErrCASConflict
  245. // is returned if the value was modified in between the
  246. // calls. ErrNotStored is returned if the value was evicted in between
  247. // the calls.
  248. func (c *Client) CompareAndSwap(item *Item) os.Error {
  249. return c.onItem(item, (*Client).cas)
  250. }
  251. func (c *Client) cas(rw *bufio.ReadWriter, item *Item) os.Error {
  252. return c.populateOne(rw, "cas", item)
  253. }
  254. func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) os.Error {
  255. if !legalKey(item.Key) {
  256. return ErrMalformedKey
  257. }
  258. var err os.Error
  259. if verb == "cas" {
  260. _, err = fmt.Fprintf(rw, "%s %s %d %d %d %d %d\r\n",
  261. verb, item.Key, item.Flags, item.Expiration, len(item.Value), item.casid)
  262. } else {
  263. _, err = fmt.Fprintf(rw, "%s %s %d %d %d %d\r\n",
  264. verb, item.Key, item.Flags, item.Expiration, len(item.Value))
  265. }
  266. if err != nil {
  267. return err
  268. }
  269. if _, err = rw.Write(item.Value); err != nil {
  270. return err
  271. }
  272. if _, err := rw.Write(crlf); err != nil {
  273. return err
  274. }
  275. if err := rw.Flush(); err != nil {
  276. return err
  277. }
  278. line, err := rw.ReadSlice('\n')
  279. if err != nil {
  280. return err
  281. }
  282. switch {
  283. case bytes.Equal(line, resultStored):
  284. return nil
  285. case bytes.Equal(line, resultNotStored):
  286. return ErrNotStored
  287. case bytes.Equal(line, resultExists):
  288. return ErrCASConflict
  289. case bytes.Equal(line, resultNotFound):
  290. return ErrCacheMiss
  291. }
  292. return fmt.Errorf("memcache: unexpected response line from %q: %q", verb, string(line))
  293. }