memcache.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. /*
  2. Copyright 2011 Google Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package memcache provides a client for the memcached cache server.
  14. package memcache
  15. import (
  16. "bufio"
  17. "bytes"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "log"
  22. "net"
  23. "os"
  24. "strings"
  25. "sync"
  26. )
  27. var _ = log.Printf
  28. // Similar to:
  29. // http://code.google.com/appengine/docs/go/memcache/reference.html
  30. var (
  31. // ErrCacheMiss means that a Get failed because the item wasn't present.
  32. ErrCacheMiss = os.NewError("memcache: cache miss")
  33. // ErrCASConflict means that a CompareAndSwap call failed due to the
  34. // cached value being modified between the Get and the CompareAndSwap.
  35. // If the cached value was simply evicted rather than replaced,
  36. // ErrNotStored will be returned instead.
  37. ErrCASConflict = os.NewError("memcache: compare-and-swap conflict")
  38. // ErrNotStored means that a conditional write operation (i.e. Add or
  39. // CompareAndSwap) failed because the condition was not satisfied.
  40. ErrNotStored = os.NewError("memcache: item not stored")
  41. // ErrServer means that a server error occurred.
  42. ErrServerError = os.NewError("memcache: server error")
  43. // ErrNoStats means that no statistics were available.
  44. ErrNoStats = os.NewError("memcache: no statistics available")
  45. // ErrMalformedKey is returned when an invalid key is used.
  46. // Keys must be at maximum 250 bytes long, ASCII, and not
  47. // contain whitespace or control characters.
  48. ErrMalformedKey = os.NewError("malformed: key is too long or contains invalid characters")
  49. // ErrNoServers is returned when no servers are configured or available.
  50. ErrNoServers = os.NewError("memcache: no servers configured or available")
  51. )
  52. const buffered = 8 // arbitrary buffered channel size
  53. // resumableError returns true if err is only a protocol-level cache error.
  54. // This is used to determine whether or not a server connection should
  55. // be re-used or not. If an error occurs, by default we don't reuse the
  56. // connection, unless it was just a cache error.
  57. func resumableError(err os.Error) bool {
  58. switch err {
  59. case ErrCacheMiss, ErrCASConflict, ErrNotStored, ErrMalformedKey:
  60. return true
  61. }
  62. return false
  63. }
  64. func legalKey(key string) bool {
  65. if len(key) > 250 {
  66. return false
  67. }
  68. for i := 0; i < len(key); i++ {
  69. if key[i] <= ' ' || key[i] > 0x7e {
  70. return false
  71. }
  72. }
  73. return true
  74. }
  75. var (
  76. crlf = []byte("\r\n")
  77. resultStored = []byte("STORED\r\n")
  78. resultNotStored = []byte("NOT_STORED\r\n")
  79. resultExists = []byte("EXISTS\r\n")
  80. resultNotFound = []byte("NOT_FOUND\r\n")
  81. end = []byte("END\r\n")
  82. )
  83. // New returns a memcache client using the provided server(s)
  84. // with equal weight. If a server is listed multiple times,
  85. // it gets a proportional amount of weight.
  86. func New(server ...string) *Client {
  87. ss := new(ServerList)
  88. ss.SetServers(server...)
  89. return NewFromSelector(ss)
  90. }
  91. // NewFromSelector returns a new Client using the provided ServerSelector.
  92. func NewFromSelector(ss ServerSelector) *Client {
  93. return &Client{selector: ss}
  94. }
  95. // Client is a memcache client.
  96. // It is safe for unlocked use by multiple concurrent goroutines.
  97. type Client struct {
  98. selector ServerSelector
  99. }
  100. // Item is an item to be got or stored in a memcached server.
  101. type Item struct {
  102. // Key is the Item's key (250 bytes maximum).
  103. Key string
  104. // Value is the Item's value.
  105. Value []byte
  106. // Object is the Item's value for use with a Codec.
  107. Object interface{}
  108. // Flags are server-opaque flags whose semantics are entirely up to the
  109. // App Engine app.
  110. Flags uint32
  111. // Expiration is the cache expiration time, in seconds: either a relative
  112. // time from now (up to 1 month), or an absolute Unix epoch time.
  113. // Zero means the Item has no expiration time.
  114. Expiration int32
  115. // Compare and swap ID.
  116. casid uint64
  117. }
  118. // conn is a connection to a server.
  119. type conn struct {
  120. nc net.Conn
  121. rw *bufio.ReadWriter
  122. addr net.Addr
  123. c *Client
  124. }
  125. // release returns this connection back to the client's free pool
  126. func (cn *conn) release() {
  127. // TODO: return to client's free pool
  128. }
  129. // condRelease releases this connection if the error pointed to by err
  130. // is is nil (not an error) or is only a protocol level error (e.g. a
  131. // cache miss). The purpose is to not recycle TCP connections that
  132. // are bad.
  133. func (cn *conn) condRelease(err *os.Error) {
  134. if *err == nil || resumableError(*err) {
  135. cn.release()
  136. }
  137. }
  138. func (c *Client) getConn(addr net.Addr) (*conn, os.Error) {
  139. // TODO(bradfitz): get from a free pool
  140. nc, err := net.Dial(addr.Network(), addr.String())
  141. if err != nil {
  142. return nil, err
  143. }
  144. // TODO: set read/write timeouts
  145. return &conn{
  146. nc: nc,
  147. addr: addr,
  148. rw: bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)),
  149. c: c,
  150. }, nil
  151. }
  152. func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) os.Error) os.Error {
  153. addr, err := c.selector.PickServer(item.Key)
  154. if err != nil {
  155. return err
  156. }
  157. cn, err := c.getConn(addr)
  158. if err != nil {
  159. return err
  160. }
  161. defer cn.condRelease(&err)
  162. if err := fn(c, cn.rw, item); err != nil {
  163. return err
  164. }
  165. return nil
  166. }
  167. // Get gets the item for the given key. ErrCacheMiss is returned for a
  168. // memcache cache miss. The key must be at most 250 bytes in length.
  169. func (c *Client) Get(key string) (item *Item, err os.Error) {
  170. if !legalKey(key) {
  171. return nil, ErrMalformedKey
  172. }
  173. addr, err := c.selector.PickServer(key)
  174. if err != nil {
  175. return nil, err
  176. }
  177. err = c.getFromAddr(addr, []string{key}, func(it *Item) { item = it })
  178. if err == nil && item == nil {
  179. err = ErrCacheMiss
  180. }
  181. return
  182. }
  183. func (c *Client) getFromAddr(addr net.Addr, keys []string, cb func(*Item)) os.Error {
  184. cn, err := c.getConn(addr)
  185. if err != nil {
  186. return err
  187. }
  188. defer cn.condRelease(&err)
  189. if _, err = fmt.Fprintf(cn.rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
  190. return err
  191. }
  192. if err = cn.rw.Flush(); err != nil {
  193. return err
  194. }
  195. if err = parseGetResponse(cn.rw.Reader, cb); err != nil {
  196. return err
  197. }
  198. return nil
  199. }
  200. // GetMulti is a batch version of Get. The returned map from keys to
  201. // items may have fewer elements than the input slice, due to memcache
  202. // cache misses. Each key must be at most 250 bytes in length.
  203. // If no error is returned, the returned map will also be non-nil.
  204. func (c *Client) GetMulti(keys []string) (map[string]*Item, os.Error) {
  205. var lk sync.Mutex
  206. m := make(map[string]*Item)
  207. addItemToMap := func(it *Item) {
  208. lk.Lock()
  209. defer lk.Unlock()
  210. m[it.Key] = it
  211. }
  212. keyMap := make(map[net.Addr][]string)
  213. for _, key := range keys {
  214. if !legalKey(key) {
  215. return nil, ErrMalformedKey
  216. }
  217. addr, err := c.selector.PickServer(key)
  218. if err != nil {
  219. return nil, err
  220. }
  221. keyMap[addr] = append(keyMap[addr], key)
  222. }
  223. ch := make(chan os.Error, buffered)
  224. for addr, keys := range keyMap {
  225. go func(addr net.Addr, keys []string) {
  226. ch <- c.getFromAddr(addr, keys, addItemToMap)
  227. }(addr, keys)
  228. }
  229. var err os.Error
  230. for _ = range keyMap {
  231. if ge := <-ch; ge != nil {
  232. err = ge
  233. }
  234. }
  235. return m, err
  236. }
  237. // parseGetResponse reads a GET response from r and calls cb for each
  238. // read and allocated Item
  239. func parseGetResponse(r *bufio.Reader, cb func(*Item)) os.Error {
  240. for {
  241. line, err := r.ReadSlice('\n')
  242. if err != nil {
  243. return err
  244. }
  245. if bytes.Equal(line, end) {
  246. return nil
  247. }
  248. it := new(Item)
  249. var size int
  250. n, err := fmt.Sscanf(string(line), "VALUE %s %d %d %d\r\n",
  251. &it.Key, &it.Flags, &size, &it.casid)
  252. if err != nil {
  253. return err
  254. }
  255. if n != 4 {
  256. return fmt.Errorf("memcache: unexpected line in get response: %q", string(line))
  257. }
  258. it.Value, err = ioutil.ReadAll(io.LimitReader(r, int64(size)+2))
  259. if err != nil {
  260. return err
  261. }
  262. if !bytes.HasSuffix(it.Value, crlf) {
  263. return fmt.Errorf("memcache: corrupt get result read")
  264. }
  265. it.Value = it.Value[:size]
  266. cb(it)
  267. }
  268. panic("unreached")
  269. }
  270. // Set writes the given item, unconditionally.
  271. func (c *Client) Set(item *Item) os.Error {
  272. return c.onItem(item, (*Client).set)
  273. }
  274. func (c *Client) set(rw *bufio.ReadWriter, item *Item) os.Error {
  275. return c.populateOne(rw, "set", item)
  276. }
  277. // Add writes the given item, if no value already exists for its
  278. // key. ErrNotStored is returned if that condition is not met.
  279. func (c *Client) Add(item *Item) os.Error {
  280. return c.onItem(item, (*Client).add)
  281. }
  282. func (c *Client) add(rw *bufio.ReadWriter, item *Item) os.Error {
  283. return c.populateOne(rw, "add", item)
  284. }
  285. // CompareAndSwap writes the given item that was previously returned
  286. // by Get, if the value was neither modified or evicted between the
  287. // Get and the CompareAndSwap calls. The item's Key should not change
  288. // between calls but all other item fields may differ. ErrCASConflict
  289. // is returned if the value was modified in between the
  290. // calls. ErrNotStored is returned if the value was evicted in between
  291. // the calls.
  292. func (c *Client) CompareAndSwap(item *Item) os.Error {
  293. return c.onItem(item, (*Client).cas)
  294. }
  295. func (c *Client) cas(rw *bufio.ReadWriter, item *Item) os.Error {
  296. return c.populateOne(rw, "cas", item)
  297. }
  298. func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) os.Error {
  299. if !legalKey(item.Key) {
  300. return ErrMalformedKey
  301. }
  302. var err os.Error
  303. if verb == "cas" {
  304. _, err = fmt.Fprintf(rw, "%s %s %d %d %d %d %d\r\n",
  305. verb, item.Key, item.Flags, item.Expiration, len(item.Value), item.casid)
  306. } else {
  307. _, err = fmt.Fprintf(rw, "%s %s %d %d %d %d\r\n",
  308. verb, item.Key, item.Flags, item.Expiration, len(item.Value))
  309. }
  310. if err != nil {
  311. return err
  312. }
  313. if _, err = rw.Write(item.Value); err != nil {
  314. return err
  315. }
  316. if _, err := rw.Write(crlf); err != nil {
  317. return err
  318. }
  319. if err := rw.Flush(); err != nil {
  320. return err
  321. }
  322. line, err := rw.ReadSlice('\n')
  323. if err != nil {
  324. return err
  325. }
  326. switch {
  327. case bytes.Equal(line, resultStored):
  328. return nil
  329. case bytes.Equal(line, resultNotStored):
  330. return ErrNotStored
  331. case bytes.Equal(line, resultExists):
  332. return ErrCASConflict
  333. case bytes.Equal(line, resultNotFound):
  334. return ErrCacheMiss
  335. }
  336. return fmt.Errorf("memcache: unexpected response line from %q: %q", verb, string(line))
  337. }
  338. func (c *Client) Delete(key string) os.Error {
  339. panic("noimpl")
  340. }
  341. func (c *Client) delete(rw *bufio.ReadWriter, item *Item) os.Error {
  342. panic("noimpl")
  343. }