浏览代码

Merge pull request #4902 from heyitsanthony/alarm-ctl

etcdctl: alarm command
Anthony Romano 9 年之前
父节点
当前提交
7ce5c2b9ff

+ 8 - 0
alarm/alarms.go

@@ -103,6 +103,14 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
 func (a *AlarmStore) Get(at pb.AlarmType) (ret []*pb.AlarmMember) {
 func (a *AlarmStore) Get(at pb.AlarmType) (ret []*pb.AlarmMember) {
 	a.mu.Lock()
 	a.mu.Lock()
 	defer a.mu.Unlock()
 	defer a.mu.Unlock()
+	if at == pb.AlarmType_NONE {
+		for _, t := range a.types {
+			for _, m := range t {
+				ret = append(ret, m)
+			}
+		}
+		return ret
+	}
 	for _, m := range a.types[at] {
 	for _, m := range a.types[at] {
 		ret = append(ret, m)
 		ret = append(ret, m)
 	}
 	}

+ 1 - 1
clientv3/client.go

@@ -184,7 +184,7 @@ func newClient(cfg *Config) (*Client, error) {
 	client.Lease = NewLease(client)
 	client.Lease = NewLease(client)
 	client.Watcher = NewWatcher(client)
 	client.Watcher = NewWatcher(client)
 	client.Auth = NewAuth(client)
 	client.Auth = NewAuth(client)
-	client.Maintenance = &maintenance{c: client}
+	client.Maintenance = NewMaintenance(client)
 	if cfg.Logger != nil {
 	if cfg.Logger != nil {
 		logger.Set(cfg.Logger)
 		logger.Set(cfg.Logger)
 	} else {
 	} else {

+ 95 - 0
clientv3/maintenance.go

@@ -15,15 +15,26 @@
 package clientv3
 package clientv3
 
 
 import (
 import (
+	"sync"
+
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
+	"google.golang.org/grpc"
 )
 )
 
 
 type (
 type (
 	DefragmentResponse pb.DefragmentResponse
 	DefragmentResponse pb.DefragmentResponse
+	AlarmResponse      pb.AlarmResponse
+	AlarmMember        pb.AlarmMember
 )
 )
 
 
 type Maintenance interface {
 type Maintenance interface {
+	// AlarmList gets all active alarms.
+	AlarmList(ctx context.Context) (*AlarmResponse, error)
+
+	// AlarmDisarm disarms a given alarm.
+	AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)
+
 	// Defragment defragments storage backend of the etcd member with given endpoint.
 	// Defragment defragments storage backend of the etcd member with given endpoint.
 	// Defragment is only needed when deleting a large number of keys and want to reclaim
 	// Defragment is only needed when deleting a large number of keys and want to reclaim
 	// the resources.
 	// the resources.
@@ -36,6 +47,72 @@ type Maintenance interface {
 
 
 type maintenance struct {
 type maintenance struct {
 	c *Client
 	c *Client
+
+	mu     sync.Mutex
+	conn   *grpc.ClientConn // conn in-use
+	remote pb.MaintenanceClient
+}
+
+func NewMaintenance(c *Client) Maintenance {
+	conn := c.ActiveConnection()
+	return &maintenance{
+		c:      c,
+		conn:   conn,
+		remote: pb.NewMaintenanceClient(conn),
+	}
+}
+
+func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
+	req := &pb.AlarmRequest{
+		Action:   pb.AlarmRequest_GET,
+		MemberID: 0,                 // all
+		Alarm:    pb.AlarmType_NONE, // all
+	}
+	for {
+		resp, err := m.getRemote().Alarm(ctx, req)
+		if err == nil {
+			return (*AlarmResponse)(resp), nil
+		}
+		if isHalted(ctx, err) {
+			return nil, err
+		}
+		if err = m.switchRemote(err); err != nil {
+			return nil, err
+		}
+	}
+}
+
+func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
+	req := &pb.AlarmRequest{
+		Action:   pb.AlarmRequest_DEACTIVATE,
+		MemberID: am.MemberID,
+		Alarm:    am.Alarm,
+	}
+
+	if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
+		ar, err := m.AlarmList(ctx)
+		if err != nil {
+			return nil, err
+		}
+		ret := AlarmResponse{}
+		for _, am := range ar.Alarms {
+			dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
+			if derr != nil {
+				return nil, derr
+			}
+			ret.Alarms = append(ret.Alarms, dresp.Alarms...)
+		}
+		return &ret, nil
+	}
+
+	resp, err := m.getRemote().Alarm(ctx, req)
+	if err == nil {
+		return (*AlarmResponse)(resp), nil
+	}
+	if !isHalted(ctx, err) {
+		go m.switchRemote(err)
+	}
+	return nil, err
 }
 }
 
 
 func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
 func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
@@ -50,3 +127,21 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
 	}
 	}
 	return (*DefragmentResponse)(resp), nil
 	return (*DefragmentResponse)(resp), nil
 }
 }
+
+func (m *maintenance) getRemote() pb.MaintenanceClient {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	return m.remote
+}
+
+func (m *maintenance) switchRemote(prevErr error) error {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	newConn, err := m.c.retryConnection(m.conn, prevErr)
+	if err != nil {
+		return err
+	}
+	m.conn = newConn
+	m.remote = pb.NewMaintenanceClient(m.conn)
+	return nil
+}

+ 81 - 0
etcdctl/ctlv3/command/alarm_command.go

@@ -0,0 +1,81 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package command
+
+import (
+	"fmt"
+
+	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/spf13/cobra"
+)
+
+// NewAlarmCommand returns the cobra command for "alarm".
+func NewAlarmCommand() *cobra.Command {
+	ac := &cobra.Command{
+		Use:   "alarm <subcommand>",
+		Short: "alarm related command",
+	}
+
+	ac.AddCommand(NewAlarmDisarmCommand())
+	ac.AddCommand(NewAlarmListCommand())
+
+	return ac
+}
+
+func NewAlarmDisarmCommand() *cobra.Command {
+	cmd := cobra.Command{
+		Use:   "disarm",
+		Short: "disarm all alarms",
+		Run:   alarmDisarmCommandFunc,
+	}
+	return &cmd
+}
+
+// alarmDisarmCommandFunc executes the "alarm disarm" command.
+func alarmDisarmCommandFunc(cmd *cobra.Command, args []string) {
+	if len(args) != 0 {
+		ExitWithError(ExitBadArgs, fmt.Errorf("alarm disarm command accepts no arguments"))
+	}
+	ctx, cancel := commandCtx(cmd)
+	resp, err := mustClientFromCmd(cmd).AlarmDisarm(ctx, &v3.AlarmMember{})
+	cancel()
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+	display.Alarm(*resp)
+}
+
+func NewAlarmListCommand() *cobra.Command {
+	cmd := cobra.Command{
+		Use:   "list",
+		Short: "list all alarms",
+		Run:   alarmListCommandFunc,
+	}
+	return &cmd
+}
+
+// alarmListCommandFunc executes the "alarm list" command.
+func alarmListCommandFunc(cmd *cobra.Command, args []string) {
+	if len(args) != 0 {
+		ExitWithError(ExitBadArgs, fmt.Errorf("alarm disarm command accepts no arguments"))
+	}
+	ctx, cancel := commandCtx(cmd)
+	resp, err := mustClientFromCmd(cmd).AlarmList(ctx)
+	cancel()
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+	display.Alarm(*resp)
+}

+ 13 - 0
etcdctl/ctlv3/command/printer.go

@@ -35,6 +35,8 @@ type printer interface {
 	Watch(v3.WatchResponse)
 	Watch(v3.WatchResponse)
 
 
 	MemberList(v3.MemberListResponse)
 	MemberList(v3.MemberListResponse)
+
+	Alarm(v3.AlarmResponse)
 }
 }
 
 
 func NewPrinter(printerType string, isHex bool) printer {
 func NewPrinter(printerType string, isHex bool) printer {
@@ -96,6 +98,12 @@ func (s *simplePrinter) Watch(resp v3.WatchResponse) {
 	}
 	}
 }
 }
 
 
+func (s *simplePrinter) Alarm(resp v3.AlarmResponse) {
+	for _, e := range resp.Alarms {
+		fmt.Printf("%+v\n", e)
+	}
+}
+
 func (s *simplePrinter) MemberList(resp v3.MemberListResponse) {
 func (s *simplePrinter) MemberList(resp v3.MemberListResponse) {
 	table := tablewriter.NewWriter(os.Stdout)
 	table := tablewriter.NewWriter(os.Stdout)
 	table.SetHeader([]string{"ID", "Status", "Name", "Peer Addrs", "Client Addrs", "Is Leader"})
 	table.SetHeader([]string{"ID", "Status", "Name", "Peer Addrs", "Client Addrs", "Is Leader"})
@@ -130,6 +138,7 @@ func (p *jsonPrinter) Get(r v3.GetResponse) {
 func (p *jsonPrinter) Put(r v3.PutResponse)               { printJSON(r) }
 func (p *jsonPrinter) Put(r v3.PutResponse)               { printJSON(r) }
 func (p *jsonPrinter) Txn(r v3.TxnResponse)               { printJSON(r) }
 func (p *jsonPrinter) Txn(r v3.TxnResponse)               { printJSON(r) }
 func (p *jsonPrinter) Watch(r v3.WatchResponse)           { printJSON(r) }
 func (p *jsonPrinter) Watch(r v3.WatchResponse)           { printJSON(r) }
+func (p *jsonPrinter) Alarm(r v3.AlarmResponse)           { printJSON(r) }
 func (p *jsonPrinter) MemberList(r v3.MemberListResponse) { printJSON(r) }
 func (p *jsonPrinter) MemberList(r v3.MemberListResponse) { printJSON(r) }
 
 
 func printJSON(v interface{}) {
 func printJSON(v interface{}) {
@@ -169,6 +178,10 @@ func (p *pbPrinter) Watch(r v3.WatchResponse) {
 	}
 	}
 }
 }
 
 
+func (p *pbPrinter) Alarm(r v3.AlarmResponse) {
+	printPB((*pb.AlarmResponse)(&r))
+}
+
 func (pb *pbPrinter) MemberList(r v3.MemberListResponse) {
 func (pb *pbPrinter) MemberList(r v3.MemberListResponse) {
 	ExitWithError(ExitBadFeature, errors.New("only support simple or json as output format"))
 	ExitWithError(ExitBadFeature, errors.New("only support simple or json as output format"))
 }
 }

+ 1 - 0
etcdctl/ctlv3/ctl.go

@@ -66,6 +66,7 @@ func init() {
 		command.NewDelCommand(),
 		command.NewDelCommand(),
 		command.NewTxnCommand(),
 		command.NewTxnCommand(),
 		command.NewCompactionCommand(),
 		command.NewCompactionCommand(),
+		command.NewAlarmCommand(),
 		command.NewDefragCommand(),
 		command.NewDefragCommand(),
 		command.NewWatchCommand(),
 		command.NewWatchCommand(),
 		command.NewVersionCommand(),
 		command.NewVersionCommand(),

+ 0 - 1
etcdserver/api/v3rpc/maintenance.go

@@ -61,6 +61,5 @@ func (s *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.Ha
 }
 }
 
 
 func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
 func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
-	plog.Warningf("alarming %+v", ar)
 	return ms.a.Alarm(ctx, ar)
 	return ms.a.Alarm(ctx, ar)
 }
 }

+ 1 - 1
etcdserver/api/v3rpc/quota.go

@@ -40,7 +40,7 @@ func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error {
 		return nil
 		return nil
 	}
 	}
 	req := &pb.AlarmRequest{
 	req := &pb.AlarmRequest{
-		MemberID: int64(qa.id),
+		MemberID: uint64(qa.id),
 		Action:   pb.AlarmRequest_ACTIVATE,
 		Action:   pb.AlarmRequest_ACTIVATE,
 		Alarm:    pb.AlarmType_NOSPACE,
 		Alarm:    pb.AlarmType_NOSPACE,
 	}
 	}

+ 20 - 5
etcdserver/apply.go

@@ -395,6 +395,8 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
 
 
 func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
 func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
 	resp := &pb.AlarmResponse{}
 	resp := &pb.AlarmResponse{}
+	oldCount := len(a.s.alarmStore.Get(ar.Alarm))
+
 	switch ar.Action {
 	switch ar.Action {
 	case pb.AlarmRequest_GET:
 	case pb.AlarmRequest_GET:
 		resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
 		resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
@@ -404,13 +406,17 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
 			break
 			break
 		}
 		}
 		resp.Alarms = append(resp.Alarms, m)
 		resp.Alarms = append(resp.Alarms, m)
+		activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1
+		if !activated {
+			break
+		}
+
 		switch m.Alarm {
 		switch m.Alarm {
 		case pb.AlarmType_NOSPACE:
 		case pb.AlarmType_NOSPACE:
-			if len(a.s.alarmStore.Get(m.Alarm)) == 1 {
-				a.s.applyV3 = newApplierV3Capped(a)
-			}
+			plog.Warningf("alarm raised %+v", m)
+			a.s.applyV3 = newApplierV3Capped(a)
 		default:
 		default:
-			plog.Warningf("unimplemented alarm activation (%+v)", m)
+			plog.Errorf("unimplemented alarm activation (%+v)", m)
 		}
 		}
 	case pb.AlarmRequest_DEACTIVATE:
 	case pb.AlarmRequest_DEACTIVATE:
 		m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
 		m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
@@ -418,8 +424,17 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
 			break
 			break
 		}
 		}
 		resp.Alarms = append(resp.Alarms, m)
 		resp.Alarms = append(resp.Alarms, m)
-		if m.Alarm == pb.AlarmType_NOSPACE && len(a.s.alarmStore.Get(ar.Alarm)) == 0 {
+		deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0
+		if !deactivated {
+			break
+		}
+
+		switch m.Alarm {
+		case pb.AlarmType_NOSPACE:
+			plog.Infof("alarm disarmed %+v", ar)
 			a.s.applyV3 = newQuotaApplierV3(a.s, &applierV3backend{a.s})
 			a.s.applyV3 = newQuotaApplierV3(a.s, &applierV3backend{a.s})
+		default:
+			plog.Errorf("unimplemented alarm deactivation (%+v)", m)
 		}
 		}
 	default:
 	default:
 		return nil, nil
 		return nil, nil

+ 2 - 2
etcdserver/etcdserverpb/rpc.pb.go

@@ -1209,7 +1209,7 @@ func (m *DefragmentResponse) GetHeader() *ResponseHeader {
 type AlarmRequest struct {
 type AlarmRequest struct {
 	Action AlarmRequest_AlarmAction `protobuf:"varint,1,opt,name=action,proto3,enum=etcdserverpb.AlarmRequest_AlarmAction" json:"action,omitempty"`
 	Action AlarmRequest_AlarmAction `protobuf:"varint,1,opt,name=action,proto3,enum=etcdserverpb.AlarmRequest_AlarmAction" json:"action,omitempty"`
 	// MemberID is the member raising the alarm request
 	// MemberID is the member raising the alarm request
-	MemberID int64     `protobuf:"varint,2,opt,name=memberID,proto3" json:"memberID,omitempty"`
+	MemberID uint64    `protobuf:"varint,2,opt,name=memberID,proto3" json:"memberID,omitempty"`
 	Alarm    AlarmType `protobuf:"varint,3,opt,name=alarm,proto3,enum=etcdserverpb.AlarmType" json:"alarm,omitempty"`
 	Alarm    AlarmType `protobuf:"varint,3,opt,name=alarm,proto3,enum=etcdserverpb.AlarmType" json:"alarm,omitempty"`
 }
 }
 
 
@@ -10122,7 +10122,7 @@ func (m *AlarmRequest) Unmarshal(data []byte) error {
 				}
 				}
 				b := data[iNdEx]
 				b := data[iNdEx]
 				iNdEx++
 				iNdEx++
-				m.MemberID |= (int64(b) & 0x7F) << shift
+				m.MemberID |= (uint64(b) & 0x7F) << shift
 				if b < 0x80 {
 				if b < 0x80 {
 					break
 					break
 				}
 				}

+ 1 - 1
etcdserver/etcdserverpb/rpc.proto

@@ -457,7 +457,7 @@ message AlarmRequest {
   }
   }
   AlarmAction action = 1;
   AlarmAction action = 1;
   // MemberID is the member raising the alarm request
   // MemberID is the member raising the alarm request
-  int64 memberID = 2;
+  uint64 memberID = 2;
   AlarmType alarm = 3;
   AlarmType alarm = 3;
 }
 }
 
 

+ 1 - 1
etcdserver/server.go

@@ -1034,7 +1034,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
 				plog.Errorf("applying raft message exceeded backend quota")
 				plog.Errorf("applying raft message exceeded backend quota")
 				go func() {
 				go func() {
 					a := &pb.AlarmRequest{
 					a := &pb.AlarmRequest{
-						MemberID: int64(s.ID()),
+						MemberID: uint64(s.ID()),
 						Action:   pb.AlarmRequest_ACTIVATE,
 						Action:   pb.AlarmRequest_ACTIVATE,
 						Alarm:    pb.AlarmType_NOSPACE,
 						Alarm:    pb.AlarmType_NOSPACE,
 					}
 					}