Browse Source

etcd: add /v2/admin/config endpoint

Yicheng Qin 11 years ago
parent
commit
f95f53e446
4 changed files with 185 additions and 0 deletions
  1. 12 0
      config/cluster_config.go
  2. 14 0
      etcd/etcd.go
  3. 36 0
      etcd/v2_admin.go
  4. 123 0
      etcd/v2_http_endpoint_test.go

+ 12 - 0
config/cluster_config.go

@@ -23,3 +23,15 @@ func NewClusterConfig() *ClusterConfig {
 		SyncInterval: DefaultSyncInterval,
 	}
 }
+
+func (c *ClusterConfig) Sanitize() {
+	if c.ActiveSize < MinActiveSize {
+		c.ActiveSize = MinActiveSize
+	}
+	if c.RemoveDelay < MinRemoveDelay {
+		c.RemoveDelay = MinRemoveDelay
+	}
+	if c.SyncInterval < MinSyncInterval {
+		c.SyncInterval = MinSyncInterval
+	}
+}

+ 14 - 0
etcd/etcd.go

@@ -27,6 +27,9 @@ const (
 	v2LeaderPrefix     = "/v2/leader"
 	v2StoreStatsPrefix = "/v2/stats/store"
 
+	v2configKVPrefix    = "/_etcd/config"
+	v2adminConfigPrefix = "/v2/admin/config"
+
 	raftPrefix = "/raft"
 )
 
@@ -103,6 +106,7 @@ func New(c *config.Config, id int64) *Server {
 	m.Handle(v2peersPrefix, handlerErr(s.serveMachines))
 	m.Handle(v2LeaderPrefix, handlerErr(s.serveLeader))
 	m.Handle(v2StoreStatsPrefix, handlerErr(s.serveStoreStats))
+	m.Handle(v2adminConfigPrefix, handlerErr(s.serveAdminConfig))
 	s.Handler = m
 	return s
 }
@@ -115,6 +119,16 @@ func (s *Server) RaftHandler() http.Handler {
 	return s.t
 }
 
+func (s *Server) ClusterConfig() *config.ClusterConfig {
+	c := config.NewClusterConfig()
+	// This is used for backward compatibility because it doesn't
+	// set cluster config in older version.
+	if e, err := s.Get(v2configKVPrefix, false, false); err == nil {
+		json.Unmarshal([]byte(*e.Node.Value), c)
+	}
+	return c
+}
+
 func (s *Server) Run() {
 	if len(s.config.Peers) == 0 {
 		s.Bootstrap()

+ 36 - 0
etcd/v2_admin.go

@@ -0,0 +1,36 @@
+package etcd
+
+import (
+	"encoding/json"
+	"net/http"
+
+	"github.com/coreos/etcd/store"
+)
+
+func (s *Server) serveAdminConfig(w http.ResponseWriter, r *http.Request) error {
+	switch r.Method {
+	case "GET":
+	case "PUT":
+		if !s.node.IsLeader() {
+			return s.redirect(w, r, s.node.Leader())
+		}
+		c := s.ClusterConfig()
+		if err := json.NewDecoder(r.Body).Decode(c); err != nil {
+			return err
+		}
+		c.Sanitize()
+		b, err := json.Marshal(c)
+		if err != nil {
+			return err
+		}
+		if _, err := s.Set(v2configKVPrefix, false, string(b), store.Permanent); err != nil {
+			return err
+		}
+	default:
+		return allow(w, "GET", "PUT")
+	}
+
+	w.Header().Set("Content-Type", "application/json")
+	json.NewEncoder(w).Encode(s.ClusterConfig())
+	return nil
+}

+ 123 - 0
etcd/v2_http_endpoint_test.go

@@ -1,6 +1,7 @@
 package etcd
 
 import (
+	"bytes"
 	"encoding/json"
 	"io/ioutil"
 	"net/http"
@@ -8,7 +9,9 @@ import (
 	"sort"
 	"strings"
 	"testing"
+	"time"
 
+	"github.com/coreos/etcd/config"
 	"github.com/coreos/etcd/store"
 )
 
@@ -114,3 +117,123 @@ func TestStoreStatsEndPoint(t *testing.T) {
 	}
 	afterTest(t)
 }
+
+func TestGetAdminConfigEndPoint(t *testing.T) {
+	es, hs := buildCluster(3, false)
+	waitCluster(t, es)
+
+	for i := range hs {
+		r, err := http.Get(hs[i].URL + v2adminConfigPrefix)
+		if err != nil {
+			t.Errorf("%v", err)
+			continue
+		}
+		if g := r.StatusCode; g != 200 {
+			t.Errorf("#%d: status = %d, want %d", i, g, 200)
+		}
+		if g := r.Header.Get("Content-Type"); g != "application/json" {
+			t.Errorf("#%d: ContentType = %d, want application/json", i, g)
+		}
+
+		conf := new(config.ClusterConfig)
+		err = json.NewDecoder(r.Body).Decode(conf)
+		r.Body.Close()
+		if err != nil {
+			t.Errorf("%v", err)
+			continue
+		}
+		w := config.NewClusterConfig()
+		if !reflect.DeepEqual(conf, w) {
+			t.Errorf("#%d: config = %+v, want %+v", i, conf, w)
+		}
+	}
+
+	for i := range es {
+		es[len(es)-i-1].Stop()
+	}
+	for i := range hs {
+		hs[len(hs)-i-1].Close()
+	}
+	afterTest(t)
+}
+
+func TestPutAdminConfigEndPoint(t *testing.T) {
+	tests := []struct {
+		c, wc string
+	}{
+		{
+			`{"activeSize":1,"removeDelay":1,"syncInterval":1}`,
+			`{"activeSize":3,"removeDelay":2,"syncInterval":1}`,
+		},
+		{
+			`{"activeSize":5,"removeDelay":20.5,"syncInterval":1.5}`,
+			`{"activeSize":5,"removeDelay":20.5,"syncInterval":1.5}`,
+		},
+		{
+			`{"activeSize":5 ,  "removeDelay":20 ,  "syncInterval": 2 }`,
+			`{"activeSize":5,"removeDelay":20,"syncInterval":2}`,
+		},
+		{
+			`{"activeSize":3, "removeDelay":60}`,
+			`{"activeSize":3,"removeDelay":60,"syncInterval":5}`,
+		},
+	}
+
+	for i, tt := range tests {
+		es, hs := buildCluster(3, false)
+		waitCluster(t, es)
+
+		r, err := NewTestClient().Put(hs[0].URL+v2adminConfigPrefix, "application/json", bytes.NewBufferString(tt.c))
+		if err != nil {
+			t.Fatalf("%v", err)
+		}
+		b, err := ioutil.ReadAll(r.Body)
+		r.Body.Close()
+		if err != nil {
+			t.Fatalf("%v", err)
+		}
+		if wbody := append([]byte(tt.wc), '\n'); !reflect.DeepEqual(b, wbody) {
+			t.Errorf("#%d: put result = %s, want %s", i, b, wbody)
+		}
+
+		barrier(t, 0, es)
+
+		for j := range es {
+			e, err := es[j].Get(v2configKVPrefix, false, false)
+			if err != nil {
+				t.Errorf("%v", err)
+				continue
+			}
+			if g := *e.Node.Value; g != tt.wc {
+				t.Errorf("#%d.%d: %s = %s, want %s", i, j, v2configKVPrefix, g, tt.wc)
+			}
+		}
+
+		for j := range es {
+			es[len(es)-j-1].Stop()
+		}
+		for j := range hs {
+			hs[len(hs)-j-1].Close()
+		}
+		afterTest(t)
+	}
+}
+
+// barrier ensures that all servers have made further progress on applied index
+// compared to the base one.
+func barrier(t *testing.T, base int, es []*Server) {
+	applied := es[base].node.Applied()
+	// time used for goroutine scheduling
+	time.Sleep(5 * time.Millisecond)
+	for i, e := range es {
+		for j := 0; ; j++ {
+			if e.node.Applied() >= applied {
+				break
+			}
+			time.Sleep(defaultHeartbeat * defaultTickDuration)
+			if j == 2 {
+				t.Fatalf("#%d: applied = %d, want >= %d", i, e.node.Applied(), applied)
+			}
+		}
+	}
+}