| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- /*
- Copyright 2013 Google Inc.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package groupcache
- import (
- "bytes"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "strings"
- "sync"
- "github.com/golang/groupcache/consistenthash"
- pb "github.com/golang/groupcache/groupcachepb"
- "github.com/golang/protobuf/proto"
- )
- const defaultBasePath = "/_groupcache/"
- const defaultReplicas = 50
- // HTTPPool implements PeerPicker for a pool of HTTP peers.
- type HTTPPool struct {
- // Context optionally specifies a context for the server to use when it
- // receives a request.
- // If nil, the server uses a nil Context.
- Context func(*http.Request) Context
- // Transport optionally specifies an http.RoundTripper for the client
- // to use when it makes a request.
- // If nil, the client uses http.DefaultTransport.
- Transport func(Context) http.RoundTripper
- // this peer's base URL, e.g. "https://example.net:8000"
- self string
- // opts specifies the options.
- opts HTTPPoolOptions
- mu sync.Mutex // guards peers and httpGetters
- peers *consistenthash.Map
- httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
- }
- // HTTPPoolOptions are the configurations of a HTTPPool.
- type HTTPPoolOptions struct {
- // BasePath specifies the HTTP path that will serve groupcache requests.
- // If blank, it defaults to "/_groupcache/".
- BasePath string
- // Replicas specifies the number of key replicas on the consistent hash.
- // If blank, it defaults to 50.
- Replicas int
- // HashFn specifies the hash function of the consistent hash.
- // If blank, it defaults to crc32.ChecksumIEEE.
- HashFn consistenthash.Hash
- }
- // NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker.
- // For convenience, it also registers itself as an http.Handler with http.DefaultServeMux.
- // The self argument be a valid base URL that points to the current server,
- // for example "http://example.net:8000".
- func NewHTTPPool(self string) *HTTPPool {
- p := NewHTTPPoolOpts(self, nil)
- http.Handle(p.opts.BasePath, p)
- return p
- }
- var httpPoolMade bool
- // NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
- // Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
- // The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
- func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
- if httpPoolMade {
- panic("groupcache: NewHTTPPool must be called only once")
- }
- httpPoolMade = true
- p := &HTTPPool{
- self: self,
- httpGetters: make(map[string]*httpGetter),
- }
- if o != nil {
- p.opts = *o
- }
- if p.opts.BasePath == "" {
- p.opts.BasePath = defaultBasePath
- }
- if p.opts.Replicas == 0 {
- p.opts.Replicas = defaultReplicas
- }
- p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
- RegisterPeerPicker(func() PeerPicker { return p })
- return p
- }
- // Set updates the pool's list of peers.
- // Each peer value should be a valid base URL,
- // for example "http://example.net:8000".
- func (p *HTTPPool) Set(peers ...string) {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
- p.peers.Add(peers...)
- p.httpGetters = make(map[string]*httpGetter, len(peers))
- for _, peer := range peers {
- p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
- }
- }
- func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.peers.IsEmpty() {
- return nil, false
- }
- if peer := p.peers.Get(key); peer != p.self {
- return p.httpGetters[peer], true
- }
- return nil, false
- }
- func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- // Parse request.
- if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
- panic("HTTPPool serving unexpected path: " + r.URL.Path)
- }
- parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
- if len(parts) != 2 {
- http.Error(w, "bad request", http.StatusBadRequest)
- return
- }
- groupName := parts[0]
- key := parts[1]
- // Fetch the value for this group/key.
- group := GetGroup(groupName)
- if group == nil {
- http.Error(w, "no such group: "+groupName, http.StatusNotFound)
- return
- }
- var ctx Context
- if p.Context != nil {
- ctx = p.Context(r)
- }
- group.Stats.ServerRequests.Add(1)
- var value []byte
- err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- // Write the value to the response body as a proto message.
- body, err := proto.Marshal(&pb.GetResponse{Value: value})
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", "application/x-protobuf")
- w.Write(body)
- }
- type httpGetter struct {
- transport func(Context) http.RoundTripper
- baseURL string
- }
- var bufferPool = sync.Pool{
- New: func() interface{} { return new(bytes.Buffer) },
- }
- func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error {
- u := fmt.Sprintf(
- "%v%v/%v",
- h.baseURL,
- url.QueryEscape(in.GetGroup()),
- url.QueryEscape(in.GetKey()),
- )
- req, err := http.NewRequest("GET", u, nil)
- if err != nil {
- return err
- }
- tr := http.DefaultTransport
- if h.transport != nil {
- tr = h.transport(context)
- }
- res, err := tr.RoundTrip(req)
- if err != nil {
- return err
- }
- defer res.Body.Close()
- if res.StatusCode != http.StatusOK {
- return fmt.Errorf("server returned: %v", res.Status)
- }
- b := bufferPool.Get().(*bytes.Buffer)
- b.Reset()
- defer bufferPool.Put(b)
- _, err = io.Copy(b, res.Body)
- if err != nil {
- return fmt.Errorf("reading response body: %v", err)
- }
- err = proto.Unmarshal(b.Bytes(), out)
- if err != nil {
- return fmt.Errorf("decoding response body: %v", err)
- }
- return nil
- }
|