|
|
@@ -393,6 +393,11 @@ func (clus *Cluster) broadcast(op rpcpb.Operation) error {
|
|
|
}
|
|
|
|
|
|
func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
|
|
|
+ _, err := clus.sendOpWithResp(idx, op)
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Response, error) {
|
|
|
// maintain the initial member object
|
|
|
// throughout the test time
|
|
|
clus.agentRequests[idx] = &rpcpb.Request{
|
|
|
@@ -409,7 +414,7 @@ func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
|
|
|
zap.Error(err),
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
resp, err := clus.agentStreams[idx].Recv()
|
|
|
@@ -431,18 +436,18 @@ func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
|
|
|
)
|
|
|
}
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
if !resp.Success {
|
|
|
- return errors.New(resp.Status)
|
|
|
+ return nil, errors.New(resp.Status)
|
|
|
}
|
|
|
|
|
|
m, secure := clus.Members[idx], false
|
|
|
for _, cu := range m.Etcd.AdvertiseClientURLs {
|
|
|
u, err := url.Parse(cu)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
if u.Scheme == "https" { // TODO: handle unix
|
|
|
secure = true
|
|
|
@@ -458,16 +463,16 @@ func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
|
|
|
"client",
|
|
|
)
|
|
|
if err = fileutil.TouchDirAll(dirClient); err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
clientCertData := []byte(resp.Member.ClientCertData)
|
|
|
if len(clientCertData) == 0 {
|
|
|
- return fmt.Errorf("got empty client cert from %q", m.EtcdClientEndpoint)
|
|
|
+ return nil, fmt.Errorf("got empty client cert from %q", m.EtcdClientEndpoint)
|
|
|
}
|
|
|
clientCertPath := filepath.Join(dirClient, "cert.pem")
|
|
|
if err = ioutil.WriteFile(clientCertPath, clientCertData, 0644); err != nil { // overwrite if exists
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
resp.Member.ClientCertPath = clientCertPath
|
|
|
clus.lg.Info(
|
|
|
@@ -477,11 +482,11 @@ func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
|
|
|
|
|
|
clientKeyData := []byte(resp.Member.ClientKeyData)
|
|
|
if len(clientKeyData) == 0 {
|
|
|
- return fmt.Errorf("got empty client key from %q", m.EtcdClientEndpoint)
|
|
|
+ return nil, fmt.Errorf("got empty client key from %q", m.EtcdClientEndpoint)
|
|
|
}
|
|
|
clientKeyPath := filepath.Join(dirClient, "key.pem")
|
|
|
if err = ioutil.WriteFile(clientKeyPath, clientKeyData, 0644); err != nil { // overwrite if exists
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
resp.Member.ClientKeyPath = clientKeyPath
|
|
|
clus.lg.Info(
|
|
|
@@ -494,7 +499,7 @@ func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
|
|
|
// TODO: disable this when auto TLS is deprecated
|
|
|
clientTrustedCAPath := filepath.Join(dirClient, "ca.pem")
|
|
|
if err = ioutil.WriteFile(clientTrustedCAPath, clientTrustedCAData, 0644); err != nil { // overwrite if exists
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
resp.Member.ClientTrustedCAPath = clientTrustedCAPath
|
|
|
clus.lg.Info(
|
|
|
@@ -507,7 +512,8 @@ func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
|
|
|
|
|
|
clus.Members[idx] = resp.Member
|
|
|
}
|
|
|
- return nil
|
|
|
+
|
|
|
+ return resp, nil
|
|
|
}
|
|
|
|
|
|
// Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT terminates all tester connections to agents and etcd servers.
|