Browse Source

Merge pull request #7139 from heyitsanthony/proxy-rlock

grpcproxy/cache: acquire read lock on Get instead of write lock
Anthony Romano 9 years ago
parent
commit
304606ab0b

+ 0 - 191
cmd/vendor/github.com/golang/groupcache/LICENSE

@@ -1,191 +0,0 @@
-Apache License
-Version 2.0, January 2004
-http://www.apache.org/licenses/
-
-TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-1. Definitions.
-
-"License" shall mean the terms and conditions for use, reproduction, and
-distribution as defined by Sections 1 through 9 of this document.
-
-"Licensor" shall mean the copyright owner or entity authorized by the copyright
-owner that is granting the License.
-
-"Legal Entity" shall mean the union of the acting entity and all other entities
-that control, are controlled by, or are under common control with that entity.
-For the purposes of this definition, "control" means (i) the power, direct or
-indirect, to cause the direction or management of such entity, whether by
-contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
-outstanding shares, or (iii) beneficial ownership of such entity.
-
-"You" (or "Your") shall mean an individual or Legal Entity exercising
-permissions granted by this License.
-
-"Source" form shall mean the preferred form for making modifications, including
-but not limited to software source code, documentation source, and configuration
-files.
-
-"Object" form shall mean any form resulting from mechanical transformation or
-translation of a Source form, including but not limited to compiled object code,
-generated documentation, and conversions to other media types.
-
-"Work" shall mean the work of authorship, whether in Source or Object form, made
-available under the License, as indicated by a copyright notice that is included
-in or attached to the work (an example is provided in the Appendix below).
-
-"Derivative Works" shall mean any work, whether in Source or Object form, that
-is based on (or derived from) the Work and for which the editorial revisions,
-annotations, elaborations, or other modifications represent, as a whole, an
-original work of authorship. For the purposes of this License, Derivative Works
-shall not include works that remain separable from, or merely link (or bind by
-name) to the interfaces of, the Work and Derivative Works thereof.
-
-"Contribution" shall mean any work of authorship, including the original version
-of the Work and any modifications or additions to that Work or Derivative Works
-thereof, that is intentionally submitted to Licensor for inclusion in the Work
-by the copyright owner or by an individual or Legal Entity authorized to submit
-on behalf of the copyright owner. For the purposes of this definition,
-"submitted" means any form of electronic, verbal, or written communication sent
-to the Licensor or its representatives, including but not limited to
-communication on electronic mailing lists, source code control systems, and
-issue tracking systems that are managed by, or on behalf of, the Licensor for
-the purpose of discussing and improving the Work, but excluding communication
-that is conspicuously marked or otherwise designated in writing by the copyright
-owner as "Not a Contribution."
-
-"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
-of whom a Contribution has been received by Licensor and subsequently
-incorporated within the Work.
-
-2. Grant of Copyright License.
-
-Subject to the terms and conditions of this License, each Contributor hereby
-grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
-irrevocable copyright license to reproduce, prepare Derivative Works of,
-publicly display, publicly perform, sublicense, and distribute the Work and such
-Derivative Works in Source or Object form.
-
-3. Grant of Patent License.
-
-Subject to the terms and conditions of this License, each Contributor hereby
-grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
-irrevocable (except as stated in this section) patent license to make, have
-made, use, offer to sell, sell, import, and otherwise transfer the Work, where
-such license applies only to those patent claims licensable by such Contributor
-that are necessarily infringed by their Contribution(s) alone or by combination
-of their Contribution(s) with the Work to which such Contribution(s) was
-submitted. If You institute patent litigation against any entity (including a
-cross-claim or counterclaim in a lawsuit) alleging that the Work or a
-Contribution incorporated within the Work constitutes direct or contributory
-patent infringement, then any patent licenses granted to You under this License
-for that Work shall terminate as of the date such litigation is filed.
-
-4. Redistribution.
-
-You may reproduce and distribute copies of the Work or Derivative Works thereof
-in any medium, with or without modifications, and in Source or Object form,
-provided that You meet the following conditions:
-
-You must give any other recipients of the Work or Derivative Works a copy of
-this License; and
-You must cause any modified files to carry prominent notices stating that You
-changed the files; and
-You must retain, in the Source form of any Derivative Works that You distribute,
-all copyright, patent, trademark, and attribution notices from the Source form
-of the Work, excluding those notices that do not pertain to any part of the
-Derivative Works; and
-If the Work includes a "NOTICE" text file as part of its distribution, then any
-Derivative Works that You distribute must include a readable copy of the
-attribution notices contained within such NOTICE file, excluding those notices
-that do not pertain to any part of the Derivative Works, in at least one of the
-following places: within a NOTICE text file distributed as part of the
-Derivative Works; within the Source form or documentation, if provided along
-with the Derivative Works; or, within a display generated by the Derivative
-Works, if and wherever such third-party notices normally appear. The contents of
-the NOTICE file are for informational purposes only and do not modify the
-License. You may add Your own attribution notices within Derivative Works that
-You distribute, alongside or as an addendum to the NOTICE text from the Work,
-provided that such additional attribution notices cannot be construed as
-modifying the License.
-You may add Your own copyright statement to Your modifications and may provide
-additional or different license terms and conditions for use, reproduction, or
-distribution of Your modifications, or for any such Derivative Works as a whole,
-provided Your use, reproduction, and distribution of the Work otherwise complies
-with the conditions stated in this License.
-
-5. Submission of Contributions.
-
-Unless You explicitly state otherwise, any Contribution intentionally submitted
-for inclusion in the Work by You to the Licensor shall be under the terms and
-conditions of this License, without any additional terms or conditions.
-Notwithstanding the above, nothing herein shall supersede or modify the terms of
-any separate license agreement you may have executed with Licensor regarding
-such Contributions.
-
-6. Trademarks.
-
-This License does not grant permission to use the trade names, trademarks,
-service marks, or product names of the Licensor, except as required for
-reasonable and customary use in describing the origin of the Work and
-reproducing the content of the NOTICE file.
-
-7. Disclaimer of Warranty.
-
-Unless required by applicable law or agreed to in writing, Licensor provides the
-Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
-including, without limitation, any warranties or conditions of TITLE,
-NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are
-solely responsible for determining the appropriateness of using or
-redistributing the Work and assume any risks associated with Your exercise of
-permissions under this License.
-
-8. Limitation of Liability.
-
-In no event and under no legal theory, whether in tort (including negligence),
-contract, or otherwise, unless required by applicable law (such as deliberate
-and grossly negligent acts) or agreed to in writing, shall any Contributor be
-liable to You for damages, including any direct, indirect, special, incidental,
-or consequential damages of any character arising as a result of this License or
-out of the use or inability to use the Work (including but not limited to
-damages for loss of goodwill, work stoppage, computer failure or malfunction, or
-any and all other commercial damages or losses), even if such Contributor has
-been advised of the possibility of such damages.
-
-9. Accepting Warranty or Additional Liability.
-
-While redistributing the Work or Derivative Works thereof, You may choose to
-offer, and charge a fee for, acceptance of support, warranty, indemnity, or
-other liability obligations and/or rights consistent with this License. However,
-in accepting such obligations, You may act only on Your own behalf and on Your
-sole responsibility, not on behalf of any other Contributor, and only if You
-agree to indemnify, defend, and hold each Contributor harmless for any liability
-incurred by, or claims asserted against, such Contributor by reason of your
-accepting any such warranty or additional liability.
-
-END OF TERMS AND CONDITIONS
-
-APPENDIX: How to apply the Apache License to your work
-
-To apply the Apache License to your work, attach the following boilerplate
-notice, with the fields enclosed by brackets "[]" replaced with your own
-identifying information. (Don't include the brackets!) The text should be
-enclosed in the appropriate comment syntax for the file format. We also
-recommend that a file or class name and description of purpose be included on
-the same "printed page" as the copyright notice for easier identification within
-third-party archives.
-
-   Copyright [yyyy] [name of copyright owner]
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.

+ 0 - 121
cmd/vendor/github.com/golang/groupcache/lru/lru.go

@@ -1,121 +0,0 @@
-/*
-Copyright 2013 Google Inc.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-// Package lru implements an LRU cache.
-package lru
-
-import "container/list"
-
-// Cache is an LRU cache. It is not safe for concurrent access.
-type Cache struct {
-	// MaxEntries is the maximum number of cache entries before
-	// an item is evicted. Zero means no limit.
-	MaxEntries int
-
-	// OnEvicted optionally specificies a callback function to be
-	// executed when an entry is purged from the cache.
-	OnEvicted func(key Key, value interface{})
-
-	ll    *list.List
-	cache map[interface{}]*list.Element
-}
-
-// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators
-type Key interface{}
-
-type entry struct {
-	key   Key
-	value interface{}
-}
-
-// New creates a new Cache.
-// If maxEntries is zero, the cache has no limit and it's assumed
-// that eviction is done by the caller.
-func New(maxEntries int) *Cache {
-	return &Cache{
-		MaxEntries: maxEntries,
-		ll:         list.New(),
-		cache:      make(map[interface{}]*list.Element),
-	}
-}
-
-// Add adds a value to the cache.
-func (c *Cache) Add(key Key, value interface{}) {
-	if c.cache == nil {
-		c.cache = make(map[interface{}]*list.Element)
-		c.ll = list.New()
-	}
-	if ee, ok := c.cache[key]; ok {
-		c.ll.MoveToFront(ee)
-		ee.Value.(*entry).value = value
-		return
-	}
-	ele := c.ll.PushFront(&entry{key, value})
-	c.cache[key] = ele
-	if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
-		c.RemoveOldest()
-	}
-}
-
-// Get looks up a key's value from the cache.
-func (c *Cache) Get(key Key) (value interface{}, ok bool) {
-	if c.cache == nil {
-		return
-	}
-	if ele, hit := c.cache[key]; hit {
-		c.ll.MoveToFront(ele)
-		return ele.Value.(*entry).value, true
-	}
-	return
-}
-
-// Remove removes the provided key from the cache.
-func (c *Cache) Remove(key Key) {
-	if c.cache == nil {
-		return
-	}
-	if ele, hit := c.cache[key]; hit {
-		c.removeElement(ele)
-	}
-}
-
-// RemoveOldest removes the oldest item from the cache.
-func (c *Cache) RemoveOldest() {
-	if c.cache == nil {
-		return
-	}
-	ele := c.ll.Back()
-	if ele != nil {
-		c.removeElement(ele)
-	}
-}
-
-func (c *Cache) removeElement(e *list.Element) {
-	c.ll.Remove(e)
-	kv := e.Value.(*entry)
-	delete(c.cache, kv.key)
-	if c.OnEvicted != nil {
-		c.OnEvicted(kv.key, kv.value)
-	}
-}
-
-// Len returns the number of items in the cache.
-func (c *Cache) Len() int {
-	if c.cache == nil {
-		return 0
-	}
-	return c.ll.Len()
-}

+ 41 - 0
cmd/vendor/github.com/karlseguin/ccache/bucket.go

@@ -0,0 +1,41 @@
+package ccache
+
+import (
+	"sync"
+	"time"
+)
+
+type bucket struct {
+	sync.RWMutex
+	lookup map[string]*Item
+}
+
+func (b *bucket) get(key string) *Item {
+	b.RLock()
+	defer b.RUnlock()
+	return b.lookup[key]
+}
+
+func (b *bucket) set(key string, value interface{}, duration time.Duration) (*Item, *Item) {
+	expires := time.Now().Add(duration).UnixNano()
+	item := newItem(key, value, expires)
+	b.Lock()
+	defer b.Unlock()
+	existing := b.lookup[key]
+	b.lookup[key] = item
+	return item, existing
+}
+
+func (b *bucket) delete(key string) *Item {
+	b.Lock()
+	defer b.Unlock()
+	item := b.lookup[key]
+	delete(b.lookup, key)
+	return item
+}
+
+func (b *bucket) clear() {
+	b.Lock()
+	defer b.Unlock()
+	b.lookup = make(map[string]*Item)
+}

+ 218 - 0
cmd/vendor/github.com/karlseguin/ccache/cache.go

@@ -0,0 +1,218 @@
+// An LRU cached aimed at high concurrency
+package ccache
+
+import (
+	"container/list"
+	"hash/fnv"
+	"sync/atomic"
+	"time"
+)
+
+type Cache struct {
+	*Configuration
+	list        *list.List
+	size        int64
+	buckets     []*bucket
+	bucketMask  uint32
+	deletables  chan *Item
+	promotables chan *Item
+}
+
+// Create a new cache with the specified configuration
+// See ccache.Configure() for creating a configuration
+func New(config *Configuration) *Cache {
+	c := &Cache{
+		list:          list.New(),
+		Configuration: config,
+		bucketMask:    uint32(config.buckets) - 1,
+		buckets:       make([]*bucket, config.buckets),
+		deletables:    make(chan *Item, config.deleteBuffer),
+		promotables:   make(chan *Item, config.promoteBuffer),
+	}
+	for i := 0; i < int(config.buckets); i++ {
+		c.buckets[i] = &bucket{
+			lookup: make(map[string]*Item),
+		}
+	}
+	go c.worker()
+	return c
+}
+
+// Get an item from the cache. Returns nil if the item wasn't found.
+// This can return an expired item. Use item.Expired() to see if the item
+// is expired and item.TTL() to see how long until the item expires (which
+// will be negative for an already expired item).
+func (c *Cache) Get(key string) *Item {
+	item := c.bucket(key).get(key)
+	if item == nil {
+		return nil
+	}
+	if item.expires > time.Now().UnixNano() {
+		c.promote(item)
+	}
+	return item
+}
+
+// Used when the cache was created with the Track() configuration option.
+// Avoid otherwise
+func (c *Cache) TrackingGet(key string) TrackedItem {
+	item := c.Get(key)
+	if item == nil {
+		return NilTracked
+	}
+	item.track()
+	return item
+}
+
+// Set the value in the cache for the specified duration
+func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
+	c.set(key, value, duration)
+}
+
+// Replace the value if it exists, does not set if it doesn't.
+// Returns true if the item existed an was replaced, false otherwise.
+// Replace does not reset item's TTL
+func (c *Cache) Replace(key string, value interface{}) bool {
+	item := c.bucket(key).get(key)
+	if item == nil {
+		return false
+	}
+	c.Set(key, value, item.TTL())
+	return true
+}
+
+// Attempts to get the value from the cache and calles fetch on a miss (missing
+// or stale item). If fetch returns an error, no value is cached and the error
+// is returned back to the caller.
+func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
+	item := c.Get(key)
+	if item != nil && !item.Expired() {
+		return item, nil
+	}
+	value, err := fetch()
+	if err != nil {
+		return nil, err
+	}
+	return c.set(key, value, duration), nil
+}
+
+// Remove the item from the cache, return true if the item was present, false otherwise.
+func (c *Cache) Delete(key string) bool {
+	item := c.bucket(key).delete(key)
+	if item != nil {
+		c.deletables <- item
+		return true
+	}
+	return false
+}
+
+//this isn't thread safe. It's meant to be called from non-concurrent tests
+func (c *Cache) Clear() {
+	for _, bucket := range c.buckets {
+		bucket.clear()
+	}
+	c.size = 0
+	c.list = list.New()
+}
+
+// Stops the background worker. Operations performed on the cache after Stop
+// is called are likely to panic
+func (c *Cache) Stop() {
+	close(c.promotables)
+}
+
+func (c *Cache) deleteItem(bucket *bucket, item *Item) {
+	bucket.delete(item.key) //stop other GETs from getting it
+	c.deletables <- item
+}
+
+func (c *Cache) set(key string, value interface{}, duration time.Duration) *Item {
+	item, existing := c.bucket(key).set(key, value, duration)
+	if existing != nil {
+		c.deletables <- existing
+	}
+	c.promote(item)
+	return item
+}
+
+func (c *Cache) bucket(key string) *bucket {
+	h := fnv.New32a()
+	h.Write([]byte(key))
+	return c.buckets[h.Sum32()&c.bucketMask]
+}
+
+func (c *Cache) promote(item *Item) {
+	c.promotables <- item
+}
+
+func (c *Cache) worker() {
+	for {
+		select {
+		case item, ok := <-c.promotables:
+			if ok == false {
+				goto drain
+			}
+			if c.doPromote(item) && c.size > c.maxSize {
+				c.gc()
+			}
+		case item := <-c.deletables:
+			c.doDelete(item)
+		}
+	}
+
+drain:
+	for {
+		select {
+		case item := <-c.deletables:
+			c.doDelete(item)
+		default:
+			close(c.deletables)
+			return
+		}
+	}
+}
+
+func (c *Cache) doDelete(item *Item) {
+	if item.element == nil {
+		item.promotions = -2
+	} else {
+		c.size -= item.size
+		c.list.Remove(item.element)
+	}
+}
+
+func (c *Cache) doPromote(item *Item) bool {
+	//already deleted
+	if item.promotions == -2 {
+		return false
+	}
+	if item.element != nil { //not a new item
+		if item.shouldPromote(c.getsPerPromote) {
+			c.list.MoveToFront(item.element)
+			item.promotions = 0
+		}
+		return false
+	}
+
+	c.size += item.size
+	item.element = c.list.PushFront(item)
+	return true
+}
+
+func (c *Cache) gc() {
+	element := c.list.Back()
+	for i := 0; i < c.itemsToPrune; i++ {
+		if element == nil {
+			return
+		}
+		prev := element.Prev()
+		item := element.Value.(*Item)
+		if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
+			c.bucket(item.key).delete(item.key)
+			c.size -= item.size
+			c.list.Remove(element)
+			item.promotions = -2
+		}
+		element = prev
+	}
+}

+ 94 - 0
cmd/vendor/github.com/karlseguin/ccache/configuration.go

@@ -0,0 +1,94 @@
+package ccache
+
+type Configuration struct {
+	maxSize        int64
+	buckets        int
+	itemsToPrune   int
+	deleteBuffer   int
+	promoteBuffer  int
+	getsPerPromote int32
+	tracking       bool
+}
+
+// Creates a configuration object with sensible defaults
+// Use this as the start of the fluent configuration:
+// e.g.: ccache.New(ccache.Configure().MaxSize(10000))
+func Configure() *Configuration {
+	return &Configuration{
+		buckets:        16,
+		itemsToPrune:   500,
+		deleteBuffer:   1024,
+		getsPerPromote: 3,
+		promoteBuffer:  1024,
+		maxSize:        5000,
+		tracking:       false,
+	}
+}
+
+// The max size for the cache
+// [5000]
+func (c *Configuration) MaxSize(max int64) *Configuration {
+	c.maxSize = max
+	return c
+}
+
+// Keys are hashed into % bucket count to provide greater concurrency (every set
+// requires a write lock on the bucket). Must be a power of 2 (1, 2, 4, 8, 16, ...)
+// [16]
+func (c *Configuration) Buckets(count uint32) *Configuration {
+	if count == 0 || ((count&(^count+1)) == count) == false {
+		count = 16
+	}
+	c.buckets = int(count)
+	return c
+}
+
+// The number of items to prune when memory is low
+// [500]
+func (c *Configuration) ItemsToPrune(count uint32) *Configuration {
+	c.itemsToPrune = int(count)
+	return c
+}
+
+// The size of the queue for items which should be promoted. If the queue fills
+// up, promotions are skipped
+// [1024]
+func (c *Configuration) PromoteBuffer(size uint32) *Configuration {
+	c.promoteBuffer = int(size)
+	return c
+}
+
+// The size of the queue for items which should be deleted. If the queue fills
+// up, calls to Delete() will block
+func (c *Configuration) DeleteBuffer(size uint32) *Configuration {
+	c.deleteBuffer = int(size)
+	return c
+}
+
+// Give a large cache with a high read / write ratio, it's usually unecessary
+// to promote an item on every Get. GetsPerPromote specifies the number of Gets
+// a key must have before being promoted
+// [3]
+func (c *Configuration) GetsPerPromote(count int32) *Configuration {
+	c.getsPerPromote = count
+	return c
+}
+
+// Typically, a cache is agnostic about how cached values are use. This is fine
+// for a typical cache usage, where you fetch an item from the cache, do something
+// (write it out) and nothing else.
+
+// However, if callers are going to keep a reference to a cached item for a long
+// time, things get messy. Specifically, the cache can evict the item, while
+// references still exist. Technically, this isn't an issue. However, if you reload
+// the item back into the cache, you end up with 2 objects representing the same
+// data. This is a waste of space and could lead to weird behavior (the type an
+// identity map is meant to solve).
+
+// By turning tracking on and using the cache's TrackingGet, the cache
+// won't evict items which you haven't called Release() on. It's a simple reference
+// counter.
+func (c *Configuration) Track() *Configuration {
+	c.tracking = true
+	return c
+}

+ 103 - 0
cmd/vendor/github.com/karlseguin/ccache/item.go

@@ -0,0 +1,103 @@
+package ccache
+
+import (
+	"container/list"
+	"sync/atomic"
+	"time"
+)
+
+type Sized interface {
+	Size() int64
+}
+
+type TrackedItem interface {
+	Value() interface{}
+	Release()
+	Expired() bool
+	TTL() time.Duration
+	Expires() time.Time
+	Extend(duration time.Duration)
+}
+
+type nilItem struct{}
+
+func (n *nilItem) Value() interface{} { return nil }
+func (n *nilItem) Release()           {}
+
+func (i *nilItem) Expired() bool {
+	return true
+}
+
+func (i *nilItem) TTL() time.Duration {
+	return time.Minute
+}
+
+func (i *nilItem) Expires() time.Time {
+	return time.Time{}
+}
+
+func (i *nilItem) Extend(duration time.Duration) {
+}
+
+var NilTracked = new(nilItem)
+
+type Item struct {
+	key        string
+	group      string
+	promotions int32
+	refCount   int32
+	expires    int64
+	size       int64
+	value      interface{}
+	element    *list.Element
+}
+
+func newItem(key string, value interface{}, expires int64) *Item {
+	size := int64(1)
+	if sized, ok := value.(Sized); ok {
+		size = sized.Size()
+	}
+	return &Item{
+		key:        key,
+		value:      value,
+		promotions: 0,
+		size:       size,
+		expires:    expires,
+	}
+}
+
+func (i *Item) shouldPromote(getsPerPromote int32) bool {
+	i.promotions += 1
+	return i.promotions == getsPerPromote
+}
+
+func (i *Item) Value() interface{} {
+	return i.value
+}
+
+func (i *Item) track() {
+	atomic.AddInt32(&i.refCount, 1)
+}
+
+func (i *Item) Release() {
+	atomic.AddInt32(&i.refCount, -1)
+}
+
+func (i *Item) Expired() bool {
+	expires := atomic.LoadInt64(&i.expires)
+	return expires < time.Now().UnixNano()
+}
+
+func (i *Item) TTL() time.Duration {
+	expires := atomic.LoadInt64(&i.expires)
+	return time.Nanosecond * time.Duration(expires-time.Now().UnixNano())
+}
+
+func (i *Item) Expires() time.Time {
+	expires := atomic.LoadInt64(&i.expires)
+	return time.Unix(0, expires)
+}
+
+func (i *Item) Extend(duration time.Duration) {
+	atomic.StoreInt64(&i.expires, time.Now().Add(duration).UnixNano())
+}

+ 82 - 0
cmd/vendor/github.com/karlseguin/ccache/layeredbucket.go

@@ -0,0 +1,82 @@
+package ccache
+
+import (
+	"sync"
+	"time"
+)
+
+type layeredBucket struct {
+	sync.RWMutex
+	buckets map[string]*bucket
+}
+
+func (b *layeredBucket) get(primary, secondary string) *Item {
+	bucket := b.getSecondaryBucket(primary)
+	if bucket == nil {
+		return nil
+	}
+	return bucket.get(secondary)
+}
+
+func (b *layeredBucket) getSecondaryBucket(primary string) *bucket {
+	b.RLock()
+	bucket, exists := b.buckets[primary]
+	b.RUnlock()
+	if exists == false {
+		return nil
+	}
+	return bucket
+}
+
+func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, *Item) {
+	b.Lock()
+	bkt, exists := b.buckets[primary]
+	if exists == false {
+		bkt = &bucket{lookup: make(map[string]*Item)}
+		b.buckets[primary] = bkt
+	}
+	b.Unlock()
+	item, existing := bkt.set(secondary, value, duration)
+	item.group = primary
+	return item, existing
+}
+
+func (b *layeredBucket) delete(primary, secondary string) *Item {
+	b.RLock()
+	bucket, exists := b.buckets[primary]
+	b.RUnlock()
+	if exists == false {
+		return nil
+	}
+	return bucket.delete(secondary)
+}
+
+func (b *layeredBucket) deleteAll(primary string, deletables chan *Item) bool {
+	b.RLock()
+	bucket, exists := b.buckets[primary]
+	b.RUnlock()
+	if exists == false {
+		return false
+	}
+
+	bucket.Lock()
+	defer bucket.Unlock()
+
+	if l := len(bucket.lookup); l == 0 {
+		return false
+	}
+	for key, item := range bucket.lookup {
+		delete(bucket.lookup, key)
+		deletables <- item
+	}
+	return true
+}
+
+func (b *layeredBucket) clear() {
+	b.Lock()
+	defer b.Unlock()
+	for _, bucket := range b.buckets {
+		bucket.clear()
+	}
+	b.buckets = make(map[string]*bucket)
+}

+ 222 - 0
cmd/vendor/github.com/karlseguin/ccache/layeredcache.go

@@ -0,0 +1,222 @@
+// An LRU cached aimed at high concurrency
+package ccache
+
+import (
+	"container/list"
+	"hash/fnv"
+	"sync/atomic"
+	"time"
+)
+
+type LayeredCache struct {
+	*Configuration
+	list        *list.List
+	buckets     []*layeredBucket
+	bucketMask  uint32
+	size        int64
+	deletables  chan *Item
+	promotables chan *Item
+}
+
+// Create a new layered cache with the specified configuration.
+// A layered cache used a two keys to identify a value: a primary key
+// and a secondary key. Get, Set and Delete require both a primary and
+// secondary key. However, DeleteAll requires only a primary key, deleting
+// all values that share the same primary key.
+
+// Layered Cache is useful as an HTTP cache, where an HTTP purge might
+// delete multiple variants of the same resource:
+// primary key = "user/44"
+// secondary key 1 = ".json"
+// secondary key 2 = ".xml"
+
+// See ccache.Configure() for creating a configuration
+func Layered(config *Configuration) *LayeredCache {
+	c := &LayeredCache{
+		list:          list.New(),
+		Configuration: config,
+		bucketMask:    uint32(config.buckets) - 1,
+		buckets:       make([]*layeredBucket, config.buckets),
+		deletables:    make(chan *Item, config.deleteBuffer),
+		promotables:   make(chan *Item, config.promoteBuffer),
+	}
+	for i := 0; i < int(config.buckets); i++ {
+		c.buckets[i] = &layeredBucket{
+			buckets: make(map[string]*bucket),
+		}
+	}
+	go c.worker()
+	return c
+}
+
+// Get an item from the cache. Returns nil if the item wasn't found.
+// This can return an expired item. Use item.Expired() to see if the item
+// is expired and item.TTL() to see how long until the item expires (which
+// will be negative for an already expired item).
+func (c *LayeredCache) Get(primary, secondary string) *Item {
+	item := c.bucket(primary).get(primary, secondary)
+	if item == nil {
+		return nil
+	}
+	if item.expires > time.Now().UnixNano() {
+		c.promote(item)
+	}
+	return item
+}
+
+// Get the secondary cache for a given primary key. This operation will
+// never return nil. In the case where the primary key does not exist, a
+// new, underlying, empty bucket will be created and returned.
+func (c *LayeredCache) GetOrCreateSecondaryCache(primary string) *SecondaryCache {
+	primaryBkt := c.bucket(primary)
+	bkt := primaryBkt.getSecondaryBucket(primary)
+	primaryBkt.Lock()
+	if bkt == nil {
+		bkt = &bucket{lookup: make(map[string]*Item)}
+		primaryBkt.buckets[primary] = bkt
+	}
+	primaryBkt.Unlock()
+	return &SecondaryCache{
+		bucket: bkt,
+		pCache: c,
+	}
+}
+
+// Used when the cache was created with the Track() configuration option.
+// Avoid otherwise
+func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
+	item := c.Get(primary, secondary)
+	if item == nil {
+		return NilTracked
+	}
+	item.track()
+	return item
+}
+
+// Set the value in the cache for the specified duration
+func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
+	c.set(primary, secondary, value, duration)
+}
+
+// Replace the value if it exists, does not set if it doesn't.
+// Returns true if the item existed an was replaced, false otherwise.
+// Replace does not reset item's TTL nor does it alter its position in the LRU
+func (c *LayeredCache) Replace(primary, secondary string, value interface{}) bool {
+	item := c.bucket(primary).get(primary, secondary)
+	if item == nil {
+		return false
+	}
+	c.Set(primary, secondary, value, item.TTL())
+	return true
+}
+
+// Attempts to get the value from the cache and calles fetch on a miss.
+// If fetch returns an error, no value is cached and the error is returned back
+// to the caller.
+func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
+	item := c.Get(primary, secondary)
+	if item != nil {
+		return item, nil
+	}
+	value, err := fetch()
+	if err != nil {
+		return nil, err
+	}
+	return c.set(primary, secondary, value, duration), nil
+}
+
+// Remove the item from the cache, return true if the item was present, false otherwise.
+func (c *LayeredCache) Delete(primary, secondary string) bool {
+	item := c.bucket(primary).delete(primary, secondary)
+	if item != nil {
+		c.deletables <- item
+		return true
+	}
+	return false
+}
+
+// Deletes all items that share the same primary key
+func (c *LayeredCache) DeleteAll(primary string) bool {
+	return c.bucket(primary).deleteAll(primary, c.deletables)
+}
+
+//this isn't thread safe. It's meant to be called from non-concurrent tests
+func (c *LayeredCache) Clear() {
+	for _, bucket := range c.buckets {
+		bucket.clear()
+	}
+	c.size = 0
+	c.list = list.New()
+}
+
+func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item {
+	item, existing := c.bucket(primary).set(primary, secondary, value, duration)
+	if existing != nil {
+		c.deletables <- existing
+	}
+	c.promote(item)
+	return item
+}
+
+func (c *LayeredCache) bucket(key string) *layeredBucket {
+	h := fnv.New32a()
+	h.Write([]byte(key))
+	return c.buckets[h.Sum32()&c.bucketMask]
+}
+
+func (c *LayeredCache) promote(item *Item) {
+	c.promotables <- item
+}
+
+func (c *LayeredCache) worker() {
+	for {
+		select {
+		case item := <-c.promotables:
+			if c.doPromote(item) && c.size > c.maxSize {
+				c.gc()
+			}
+		case item := <-c.deletables:
+			if item.element == nil {
+				item.promotions = -2
+			} else {
+				c.size -= item.size
+				c.list.Remove(item.element)
+			}
+		}
+	}
+}
+
+func (c *LayeredCache) doPromote(item *Item) bool {
+	// deleted before it ever got promoted
+	if item.promotions == -2 {
+		return false
+	}
+	if item.element != nil { //not a new item
+		if item.shouldPromote(c.getsPerPromote) {
+			c.list.MoveToFront(item.element)
+			item.promotions = 0
+		}
+		return false
+	}
+	c.size += item.size
+	item.element = c.list.PushFront(item)
+	return true
+}
+
+func (c *LayeredCache) gc() {
+	element := c.list.Back()
+	for i := 0; i < c.itemsToPrune; i++ {
+		if element == nil {
+			return
+		}
+		prev := element.Prev()
+		item := element.Value.(*Item)
+		if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
+			c.bucket(item.group).delete(item.group, item.key)
+			c.size -= item.size
+			c.list.Remove(element)
+			item.promotions = -2
+		}
+		element = prev
+	}
+}

+ 19 - 0
cmd/vendor/github.com/karlseguin/ccache/license.txt

@@ -0,0 +1,19 @@
+Copyright (c) 2013 Karl Seguin.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.

+ 72 - 0
cmd/vendor/github.com/karlseguin/ccache/secondarycache.go

@@ -0,0 +1,72 @@
+package ccache
+
+import "time"
+
+type SecondaryCache struct {
+	bucket *bucket
+	pCache *LayeredCache
+}
+
+// Get the secondary key.
+// The semantics are the same as for LayeredCache.Get
+func (s *SecondaryCache) Get(secondary string) *Item {
+	return s.bucket.get(secondary)
+}
+
+// Set the secondary key to a value.
+// The semantics are the same as for LayeredCache.Set
+func (s *SecondaryCache) Set(secondary string, value interface{}, duration time.Duration) *Item {
+	item, existing := s.bucket.set(secondary, value, duration)
+	if existing != nil {
+		s.pCache.deletables <- existing
+	}
+	s.pCache.promote(item)
+	return item
+}
+
+// Fetch or set a secondary key.
+// The semantics are the same as for LayeredCache.Fetch
+func (s *SecondaryCache) Fetch(secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
+	item := s.Get(secondary)
+	if item != nil {
+		return item, nil
+	}
+	value, err := fetch()
+	if err != nil {
+		return nil, err
+	}
+	return s.Set(secondary, value, duration), nil
+}
+
+// Delete a secondary key.
+// The semantics are the same as for LayeredCache.Delete
+func (s *SecondaryCache) Delete(secondary string) bool {
+	item := s.bucket.delete(secondary)
+	if item != nil {
+		s.pCache.deletables <- item
+		return true
+	}
+	return false
+}
+
+// Replace a secondary key.
+// The semantics are the same as for LayeredCache.Replace
+func (s *SecondaryCache) Replace(secondary string, value interface{}) bool {
+	item := s.Get(secondary)
+	if item == nil {
+		return false
+	}
+	s.Set(secondary, value, item.TTL())
+	return true
+}
+
+// Track a secondary key.
+// The semantics are the same as for LayeredCache.TrackingGet
+func (c *SecondaryCache) TrackingGet(secondary string) TrackedItem {
+	item := c.Get(secondary)
+	if item == nil {
+		return NilTracked
+	}
+	item.track()
+	return item
+}

+ 4 - 6
glide.lock

@@ -1,5 +1,5 @@
-hash: e19ee990cd69a4691200ce929e44477f4e5a1a43fb72303a172cdfaddc16cb47
-updated: 2017-01-09T12:10:50.0887037+01:00
+hash: 1318c1dff08a83ee9e334c94d23c165fbef5508a96ef8782ca0529e332521538
+updated: 2017-01-11T16:34:17.552765434-08:00
 imports:
 - name: github.com/beorn7/perks
   version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
@@ -38,10 +38,6 @@ imports:
   version: 909568be09de550ed094403c2bf8a261b5bb730a
   subpackages:
   - proto
-- name: github.com/golang/groupcache
-  version: 02826c3e79038b59d737d3b1c0a1d937f71a4433
-  subpackages:
-  - lru
 - name: github.com/golang/protobuf
   version: 4bd1920723d7b7c925de087aa32e2187708897f7
   subpackages:
@@ -61,6 +57,8 @@ imports:
   version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75
 - name: github.com/jonboulle/clockwork
   version: 2eee05ed794112d45db504eb05aa693efd2b8b09
+- name: github.com/karlseguin/ccache
+  version: a2d62155777b39595c825ed3824279e642a5db3c
 - name: github.com/kr/pty
   version: f7ee69f31298ecbe5d2b349c711e2547a617d398
 - name: github.com/mattn/go-runewidth

+ 2 - 4
glide.yaml

@@ -28,10 +28,6 @@ import:
   version: v0.3
   subpackages:
   - proto
-- package: github.com/golang/groupcache
-  version: 02826c3e79038b59d737d3b1c0a1d937f71a4433
-  subpackages:
-  - lru
 - package: github.com/golang/protobuf
   version: 4bd1920723d7b7c925de087aa32e2187708897f7
   subpackages:
@@ -105,3 +101,5 @@ import:
   version: 976c720a22c8eb4eb6a0b4348ad85ad12491a506
   subpackages:
   - assert
+- package: github.com/karlseguin/ccache
+  version: v2.0.2

+ 19 - 17
proxy/grpcproxy/cache/store.go

@@ -17,11 +17,13 @@ package cache
 import (
 	"errors"
 	"sync"
+	"time"
+
+	"github.com/karlseguin/ccache"
 
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/adt"
-	"github.com/golang/groupcache/lru"
 )
 
 var (
@@ -29,12 +31,14 @@ var (
 	ErrCompacted      = rpctypes.ErrGRPCCompacted
 )
 
+const defaultHistoricTTL = time.Hour
+const defaultCurrentTTL = time.Minute
+
 type Cache interface {
 	Add(req *pb.RangeRequest, resp *pb.RangeResponse)
 	Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
 	Compact(revision int64)
 	Invalidate(key []byte, endkey []byte)
-	Size() int
 }
 
 // keyFunc returns the key of an request, which is used to look up in the cache for it's caching response.
@@ -49,7 +53,7 @@ func keyFunc(req *pb.RangeRequest) string {
 
 func NewCache(maxCacheEntries int) Cache {
 	return &cache{
-		lru:          lru.New(maxCacheEntries),
+		lru:          ccache.New(ccache.Configure().MaxSize(int64(maxCacheEntries))),
 		compactedRev: -1,
 	}
 }
@@ -57,7 +61,7 @@ func NewCache(maxCacheEntries int) Cache {
 // cache implements Cache
 type cache struct {
 	mu  sync.RWMutex
-	lru *lru.Cache
+	lru *ccache.Cache
 
 	// a reverse index for cache invalidation
 	cachedRanges adt.IntervalTree
@@ -73,7 +77,11 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
 	defer c.mu.Unlock()
 
 	if req.Revision > c.compactedRev {
-		c.lru.Add(key, resp)
+		if req.Revision == 0 {
+			c.lru.Set(key, resp, defaultCurrentTTL)
+		} else {
+			c.lru.Set(key, resp, defaultHistoricTTL)
+		}
 	}
 	// we do not need to invalidate a request with a revision specified.
 	// so we do not need to add it into the reverse index.
@@ -105,16 +113,16 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
 func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) {
 	key := keyFunc(req)
 
-	c.mu.Lock()
-	defer c.mu.Unlock()
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 
 	if req.Revision < c.compactedRev {
-		c.lru.Remove(key)
+		c.lru.Delete(key)
 		return nil, ErrCompacted
 	}
 
-	if resp, ok := c.lru.Get(key); ok {
-		return resp.(*pb.RangeResponse), nil
+	if item := c.lru.Get(key); item != nil {
+		return item.Value().(*pb.RangeResponse), nil
 	}
 	return nil, errors.New("not exist")
 }
@@ -138,7 +146,7 @@ func (c *cache) Invalidate(key, endkey []byte) {
 	for _, iv := range ivs {
 		keys := iv.Val.([]string)
 		for _, key := range keys {
-			c.lru.Remove(key)
+			c.lru.Delete(key)
 		}
 	}
 	// delete after removing all keys since it is destructive to 'ivs'
@@ -155,9 +163,3 @@ func (c *cache) Compact(revision int64) {
 		c.compactedRev = revision
 	}
 }
-
-func (c *cache) Size() int {
-	c.mu.RLock()
-	defer c.mu.RUnlock()
-	return c.lru.Len()
-}

+ 0 - 7
proxy/grpcproxy/kv.go

@@ -58,14 +58,12 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo
 	req.Serializable = true
 	gresp := (*pb.RangeResponse)(resp.Get())
 	p.cache.Add(&req, gresp)
-	cacheKeys.Set(float64(p.cache.Size()))
 
 	return gresp, nil
 }
 
 func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
 	p.cache.Invalidate(r.Key, nil)
-	cacheKeys.Set(float64(p.cache.Size()))
 
 	resp, err := p.kv.Do(ctx, PutRequestToOp(r))
 	return (*pb.PutResponse)(resp.Put()), err
@@ -73,7 +71,6 @@ func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, e
 
 func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
 	p.cache.Invalidate(r.Key, r.RangeEnd)
-	cacheKeys.Set(float64(p.cache.Size()))
 
 	resp, err := p.kv.Do(ctx, DelRequestToOp(r))
 	return (*pb.DeleteRangeResponse)(resp.Del()), err
@@ -129,8 +126,6 @@ func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e
 		p.txnToCache(r.Failure, resp.Responses)
 	}
 
-	cacheKeys.Set(float64(p.cache.Size()))
-
 	return (*pb.TxnResponse)(resp), nil
 }
 
@@ -145,8 +140,6 @@ func (p *kvProxy) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Com
 		p.cache.Compact(r.Revision)
 	}
 
-	cacheKeys.Set(float64(p.cache.Size()))
-
 	return (*pb.CompactionResponse)(resp), err
 }
 

+ 0 - 7
proxy/grpcproxy/metrics.go

@@ -29,12 +29,6 @@ var (
 		Name:      "events_coalescing_total",
 		Help:      "Total number of events coalescing",
 	})
-	cacheKeys = prometheus.NewGauge(prometheus.GaugeOpts{
-		Namespace: "etcd",
-		Subsystem: "grpc_proxy",
-		Name:      "cache_keys_total",
-		Help:      "Total number of keys/ranges cached",
-	})
 	cacheHits = prometheus.NewGauge(prometheus.GaugeOpts{
 		Namespace: "etcd",
 		Subsystem: "grpc_proxy",
@@ -52,7 +46,6 @@ var (
 func init() {
 	prometheus.MustRegister(watchersCoalescing)
 	prometheus.MustRegister(eventsCoalescing)
-	prometheus.MustRegister(cacheKeys)
 	prometheus.MustRegister(cacheHits)
 	prometheus.MustRegister(cachedMisses)
 }