|
@@ -15,6 +15,7 @@
|
|
|
package sdk
|
|
package sdk
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
|
|
+ "fmt"
|
|
|
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth"
|
|
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth"
|
|
|
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials"
|
|
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials"
|
|
|
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/endpoints"
|
|
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/endpoints"
|
|
@@ -23,8 +24,8 @@ import (
|
|
|
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses"
|
|
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses"
|
|
|
"net"
|
|
"net"
|
|
|
"net/http"
|
|
"net/http"
|
|
|
- "fmt"
|
|
|
|
|
"strconv"
|
|
"strconv"
|
|
|
|
|
+ "sync"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
// this value will be replaced while build: -ldflags="-X sdk.version=x.x.x"
|
|
// this value will be replaced while build: -ldflags="-X sdk.version=x.x.x"
|
|
@@ -39,6 +40,8 @@ type Client struct {
|
|
|
|
|
|
|
|
debug bool
|
|
debug bool
|
|
|
isRunning bool
|
|
isRunning bool
|
|
|
|
|
+ // void "panic(write to close channel)" cause of addAsync() after Shutdown()
|
|
|
|
|
+ asyncChanLock *sync.RWMutex
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (client *Client) Init() (err error) {
|
|
func (client *Client) Init() (err error) {
|
|
@@ -47,6 +50,7 @@ func (client *Client) Init() (err error) {
|
|
|
|
|
|
|
|
func (client *Client) InitWithOptions(regionId string, config *Config, credential auth.Credential) (err error) {
|
|
func (client *Client) InitWithOptions(regionId string, config *Config, credential auth.Credential) (err error) {
|
|
|
client.isRunning = true
|
|
client.isRunning = true
|
|
|
|
|
+ client.asyncChanLock = new(sync.RWMutex)
|
|
|
client.regionId = regionId
|
|
client.regionId = regionId
|
|
|
client.config = config
|
|
client.config = config
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -77,8 +81,10 @@ func (client *Client) EnableAsync(routinePoolSize, maxTaskQueueSize int) {
|
|
|
go func() {
|
|
go func() {
|
|
|
for client.isRunning {
|
|
for client.isRunning {
|
|
|
select {
|
|
select {
|
|
|
- case task := <-client.asyncTaskQueue:
|
|
|
|
|
- task()
|
|
|
|
|
|
|
+ case task, notClosed := <-client.asyncTaskQueue:
|
|
|
|
|
+ if notClosed {
|
|
|
|
|
+ task()
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
@@ -136,7 +142,7 @@ func (client *Client) InitWithStsRoleNameOnEcs(regionId, roleName string) (err e
|
|
|
func (client *Client) InitClientConfig() (config *Config) {
|
|
func (client *Client) InitClientConfig() (config *Config) {
|
|
|
if client.config != nil {
|
|
if client.config != nil {
|
|
|
return client.config
|
|
return client.config
|
|
|
- }else{
|
|
|
|
|
|
|
+ } else {
|
|
|
return NewConfig()
|
|
return NewConfig()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -177,26 +183,16 @@ func (client *Client) DoActionWithSigner(request requests.AcsRequest, response r
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// signature
|
|
// signature
|
|
|
|
|
+ var finalSigner auth.Signer
|
|
|
if signer != nil {
|
|
if signer != nil {
|
|
|
- err = auth.Sign(request, signer, regionId)
|
|
|
|
|
|
|
+ finalSigner = signer
|
|
|
} else {
|
|
} else {
|
|
|
- err = auth.Sign(request, client.signer, regionId)
|
|
|
|
|
|
|
+ finalSigner = client.signer
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+ httpRequest, err := buildHttpRequest(request, finalSigner, regionId)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- requestMethod := request.GetMethod()
|
|
|
|
|
- requestUrl := request.BuildUrl()
|
|
|
|
|
- body := request.GetBodyReader()
|
|
|
|
|
- httpRequest, err := http.NewRequest(requestMethod, requestUrl, body)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- for key, value := range request.GetHeaders() {
|
|
|
|
|
- httpRequest.Header[key] = []string{value}
|
|
|
|
|
- }
|
|
|
|
|
var httpResponse *http.Response
|
|
var httpResponse *http.Response
|
|
|
for retryTimes := 0; retryTimes <= client.config.MaxRetryTime; retryTimes++ {
|
|
for retryTimes := 0; retryTimes <= client.config.MaxRetryTime; retryTimes++ {
|
|
|
httpResponse, err = client.httpClient.Do(httpRequest)
|
|
httpResponse, err = client.httpClient.Do(httpRequest)
|
|
@@ -209,7 +205,7 @@ func (client *Client) DoActionWithSigner(request requests.AcsRequest, response r
|
|
|
return
|
|
return
|
|
|
} else if retryTimes >= client.config.MaxRetryTime {
|
|
} else if retryTimes >= client.config.MaxRetryTime {
|
|
|
// timeout but reached the max retry times, return
|
|
// timeout but reached the max retry times, return
|
|
|
- timeoutErrorMsg := fmt.Sprintf(errors.TimeoutErrorMessage, strconv.Itoa(retryTimes + 1), strconv.Itoa(retryTimes + 1))
|
|
|
|
|
|
|
+ timeoutErrorMsg := fmt.Sprintf(errors.TimeoutErrorMessage, strconv.Itoa(retryTimes+1), strconv.Itoa(retryTimes+1))
|
|
|
err = errors.NewClientError(errors.TimeoutErrorCode, timeoutErrorMsg, err)
|
|
err = errors.NewClientError(errors.TimeoutErrorCode, timeoutErrorMsg, err)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
@@ -217,10 +213,9 @@ func (client *Client) DoActionWithSigner(request requests.AcsRequest, response r
|
|
|
// if status code >= 500 or timeout, will trigger retry
|
|
// if status code >= 500 or timeout, will trigger retry
|
|
|
if client.config.AutoRetry && (timeout || isServerError(httpResponse)) {
|
|
if client.config.AutoRetry && (timeout || isServerError(httpResponse)) {
|
|
|
// rewrite signatureNonce and signature
|
|
// rewrite signatureNonce and signature
|
|
|
- if signer != nil {
|
|
|
|
|
- err = auth.Sign(request, signer, regionId)
|
|
|
|
|
- } else {
|
|
|
|
|
- err = auth.Sign(request, client.signer, regionId)
|
|
|
|
|
|
|
+ httpRequest, err = buildHttpRequest(request, finalSigner, regionId)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
@@ -230,6 +225,28 @@ func (client *Client) DoActionWithSigner(request requests.AcsRequest, response r
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func buildHttpRequest(request requests.AcsRequest, singer auth.Signer, regionId string) (httpRequest *http.Request, err error) {
|
|
|
|
|
+ err = auth.Sign(request, singer, regionId)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ requestMethod := request.GetMethod()
|
|
|
|
|
+ requestUrl := request.BuildUrl()
|
|
|
|
|
+ body := request.GetBodyReader()
|
|
|
|
|
+ httpRequest, err = http.NewRequest(requestMethod, requestUrl, body)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ for key, value := range request.GetHeaders() {
|
|
|
|
|
+ httpRequest.Header[key] = []string{value}
|
|
|
|
|
+ }
|
|
|
|
|
+ // host is a special case
|
|
|
|
|
+ if host, containsHost := request.GetHeaders()["Host"]; containsHost {
|
|
|
|
|
+ httpRequest.Host = host
|
|
|
|
|
+ }
|
|
|
|
|
+ return
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func isTimeout(err error) bool {
|
|
func isTimeout(err error) bool {
|
|
|
if err == nil {
|
|
if err == nil {
|
|
|
return false
|
|
return false
|
|
@@ -242,9 +259,18 @@ func isServerError(httpResponse *http.Response) bool {
|
|
|
return httpResponse.StatusCode >= http.StatusInternalServerError
|
|
return httpResponse.StatusCode >= http.StatusInternalServerError
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+/**
|
|
|
|
|
+only block when any one of the following occurs:
|
|
|
|
|
+1. the asyncTaskQueue is full, increase the queue size to avoid this
|
|
|
|
|
+2. Shutdown() in progressing, the client is being closed
|
|
|
|
|
+**/
|
|
|
func (client *Client) AddAsyncTask(task func()) (err error) {
|
|
func (client *Client) AddAsyncTask(task func()) (err error) {
|
|
|
if client.asyncTaskQueue != nil {
|
|
if client.asyncTaskQueue != nil {
|
|
|
- client.asyncTaskQueue <- task
|
|
|
|
|
|
|
+ client.asyncChanLock.RLock()
|
|
|
|
|
+ defer client.asyncChanLock.RUnlock()
|
|
|
|
|
+ if client.isRunning {
|
|
|
|
|
+ client.asyncTaskQueue <- task
|
|
|
|
|
+ }
|
|
|
} else {
|
|
} else {
|
|
|
err = errors.NewClientError(errors.AsyncFunctionNotEnabledCode, errors.AsyncFunctionNotEnabledMessage, nil)
|
|
err = errors.NewClientError(errors.AsyncFunctionNotEnabledCode, errors.AsyncFunctionNotEnabledMessage, nil)
|
|
|
}
|
|
}
|
|
@@ -313,6 +339,9 @@ func (client *Client) ProcessCommonRequestWithSigner(request *requests.CommonReq
|
|
|
|
|
|
|
|
func (client *Client) Shutdown() {
|
|
func (client *Client) Shutdown() {
|
|
|
client.signer.Shutdown()
|
|
client.signer.Shutdown()
|
|
|
- close(client.asyncTaskQueue)
|
|
|
|
|
|
|
+ // lock the addAsync()
|
|
|
|
|
+ client.asyncChanLock.Lock()
|
|
|
|
|
+ defer client.asyncChanLock.Unlock()
|
|
|
client.isRunning = false
|
|
client.isRunning = false
|
|
|
|
|
+ close(client.asyncTaskQueue)
|
|
|
}
|
|
}
|