http.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. /*
  2. Copyright 2013 Google Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package groupcache
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "net/url"
  20. "strings"
  21. "sync"
  22. "github.com/golang/groupcache/consistenthash"
  23. pb "github.com/golang/groupcache/groupcachepb"
  24. "github.com/golang/protobuf/proto"
  25. )
  26. const defaultBasePath = "/_groupcache/"
  27. const defaultReplicas = 50
  28. // HTTPPool implements PeerPicker for a pool of HTTP peers.
  29. type HTTPPool struct {
  30. // Context optionally specifies a context for the server to use when it
  31. // receives a request.
  32. // If nil, the server uses a nil Context.
  33. Context func(*http.Request) Context
  34. // Transport optionally specifies an http.RoundTripper for the client
  35. // to use when it makes a request.
  36. // If nil, the client uses http.DefaultTransport.
  37. Transport func(Context) http.RoundTripper
  38. // this peer's base URL, e.g. "https://example.net:8000"
  39. self string
  40. // opts specifies the options.
  41. opts HTTPPoolOptions
  42. mu sync.Mutex // guards peers and httpGetters
  43. peers *consistenthash.Map
  44. httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
  45. }
  46. // HTTPPoolOptions are the configurations of a HTTPPool.
  47. type HTTPPoolOptions struct {
  48. // BasePath specifies the HTTP path that will serve groupcache requests.
  49. // If blank, it defaults to "/_groupcache/".
  50. BasePath string
  51. // Replicas specifies the number of key replicas on the consistent hash.
  52. // If blank, it defaults to 50.
  53. Replicas int
  54. // HashFn specifies the hash function of the consistent hash.
  55. // If blank, it defaults to crc32.ChecksumIEEE.
  56. HashFn consistenthash.Hash
  57. }
  58. // NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker.
  59. // For convenience, it also registers itself as an http.Handler with http.DefaultServeMux.
  60. // The self argument be a valid base URL that points to the current server,
  61. // for example "http://example.net:8000".
  62. func NewHTTPPool(self string) *HTTPPool {
  63. p := NewHTTPPoolOpts(self, nil)
  64. http.Handle(p.opts.BasePath, p)
  65. return p
  66. }
  67. var httpPoolMade bool
  68. // NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
  69. // Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
  70. // The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
  71. func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
  72. if httpPoolMade {
  73. panic("groupcache: NewHTTPPool must be called only once")
  74. }
  75. httpPoolMade = true
  76. p := &HTTPPool{
  77. self: self,
  78. httpGetters: make(map[string]*httpGetter),
  79. }
  80. if o != nil {
  81. p.opts = *o
  82. }
  83. if p.opts.BasePath == "" {
  84. p.opts.BasePath = defaultBasePath
  85. }
  86. if p.opts.Replicas == 0 {
  87. p.opts.Replicas = defaultReplicas
  88. }
  89. p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
  90. RegisterPeerPicker(func() PeerPicker { return p })
  91. return p
  92. }
  93. // Set updates the pool's list of peers.
  94. // Each peer value should be a valid base URL,
  95. // for example "http://example.net:8000".
  96. func (p *HTTPPool) Set(peers ...string) {
  97. p.mu.Lock()
  98. defer p.mu.Unlock()
  99. p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
  100. p.peers.Add(peers...)
  101. p.httpGetters = make(map[string]*httpGetter, len(peers))
  102. for _, peer := range peers {
  103. p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
  104. }
  105. }
  106. func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
  107. p.mu.Lock()
  108. defer p.mu.Unlock()
  109. if p.peers.IsEmpty() {
  110. return nil, false
  111. }
  112. if peer := p.peers.Get(key); peer != p.self {
  113. return p.httpGetters[peer], true
  114. }
  115. return nil, false
  116. }
  117. func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  118. // Parse request.
  119. if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
  120. panic("HTTPPool serving unexpected path: " + r.URL.Path)
  121. }
  122. parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
  123. if len(parts) != 2 {
  124. http.Error(w, "bad request", http.StatusBadRequest)
  125. return
  126. }
  127. groupName := parts[0]
  128. key := parts[1]
  129. // Fetch the value for this group/key.
  130. group := GetGroup(groupName)
  131. if group == nil {
  132. http.Error(w, "no such group: "+groupName, http.StatusNotFound)
  133. return
  134. }
  135. var ctx Context
  136. if p.Context != nil {
  137. ctx = p.Context(r)
  138. }
  139. group.Stats.ServerRequests.Add(1)
  140. var value []byte
  141. err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
  142. if err != nil {
  143. http.Error(w, err.Error(), http.StatusInternalServerError)
  144. return
  145. }
  146. // Write the value to the response body as a proto message.
  147. body, err := proto.Marshal(&pb.GetResponse{Value: value})
  148. if err != nil {
  149. http.Error(w, err.Error(), http.StatusInternalServerError)
  150. return
  151. }
  152. w.Header().Set("Content-Type", "application/x-protobuf")
  153. w.Write(body)
  154. }
  155. type httpGetter struct {
  156. transport func(Context) http.RoundTripper
  157. baseURL string
  158. }
  159. var bufferPool = sync.Pool{
  160. New: func() interface{} { return new(bytes.Buffer) },
  161. }
  162. func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error {
  163. u := fmt.Sprintf(
  164. "%v%v/%v",
  165. h.baseURL,
  166. url.QueryEscape(in.GetGroup()),
  167. url.QueryEscape(in.GetKey()),
  168. )
  169. req, err := http.NewRequest("GET", u, nil)
  170. if err != nil {
  171. return err
  172. }
  173. tr := http.DefaultTransport
  174. if h.transport != nil {
  175. tr = h.transport(context)
  176. }
  177. res, err := tr.RoundTrip(req)
  178. if err != nil {
  179. return err
  180. }
  181. defer res.Body.Close()
  182. if res.StatusCode != http.StatusOK {
  183. return fmt.Errorf("server returned: %v", res.Status)
  184. }
  185. b := bufferPool.Get().(*bytes.Buffer)
  186. b.Reset()
  187. defer bufferPool.Put(b)
  188. _, err = io.Copy(b, res.Body)
  189. if err != nil {
  190. return fmt.Errorf("reading response body: %v", err)
  191. }
  192. err = proto.Unmarshal(b.Bytes(), out)
  193. if err != nil {
  194. return fmt.Errorf("decoding response body: %v", err)
  195. }
  196. return nil
  197. }