|
|
@@ -0,0 +1,165 @@
|
|
|
+package mns
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ "github.com/aliyun/alibaba-cloud-sdk-go/sdk"
|
|
|
+ "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth"
|
|
|
+ "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials"
|
|
|
+ "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/signers"
|
|
|
+ "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors"
|
|
|
+ "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
|
|
|
+ "github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses"
|
|
|
+ "net/http"
|
|
|
+ "strconv"
|
|
|
+)
|
|
|
+
|
|
|
+type Queue struct {
|
|
|
+ credential *credentials.StsTokenCredential
|
|
|
+ httpClient *http.Client
|
|
|
+ isRunning bool
|
|
|
+ config *sdk.Config
|
|
|
+ signer auth.Signer
|
|
|
+}
|
|
|
+
|
|
|
+func NewClientWithStsToken(regionId, stsAccessKeyId, stsAccessKeySecret, stsToken string) (queue *Queue, err error) {
|
|
|
+ queue = &Queue{}
|
|
|
+ err = queue.InitWithStsToken(regionId, stsAccessKeyId, stsAccessKeySecret, stsToken)
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (queue *Queue) InitWithStsToken(regionId, stsAccessKeyId, stsAccessKeySecret, stsToken string) (err error) {
|
|
|
+ credential := &credentials.StsTokenCredential{
|
|
|
+ AccessKeyId: stsAccessKeyId,
|
|
|
+ AccessKeySecret: stsAccessKeySecret,
|
|
|
+ AccessKeyStsToken: stsToken,
|
|
|
+ }
|
|
|
+ queue.signer = signers.NewStsTokenSigner(credential)
|
|
|
+ config := queue.InitClientConfig()
|
|
|
+ return queue.InitWithOptions(config, credential)
|
|
|
+}
|
|
|
+
|
|
|
+func (queue *Queue) InitClientConfig() (config *sdk.Config) {
|
|
|
+ if queue.config != nil {
|
|
|
+ return queue.config
|
|
|
+ } else {
|
|
|
+ return sdk.NewConfig()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (queue *Queue) InitWithOptions(config *sdk.Config, credential auth.Credential) (err error) {
|
|
|
+ queue.isRunning = true
|
|
|
+ queue.config = config
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ queue.httpClient = &http.Client{}
|
|
|
+
|
|
|
+ if config.HttpTransport != nil {
|
|
|
+ queue.httpClient.Transport = config.HttpTransport
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.Timeout > 0 {
|
|
|
+ queue.httpClient.Timeout = config.Timeout
|
|
|
+ }
|
|
|
+
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (queue *Queue) DoActionWithSigner(request requests.AcsRequest, response responses.AcsResponse) (err error) {
|
|
|
+
|
|
|
+ // add clientVersion
|
|
|
+ request.GetHeaders()["x-sdk-core-version"] = sdk.Version
|
|
|
+
|
|
|
+ if request.GetScheme() == "" {
|
|
|
+ request.SetScheme("HTTP")
|
|
|
+ }
|
|
|
+ // init request params
|
|
|
+ err = requests.InitParams(request)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // signature
|
|
|
+
|
|
|
+
|
|
|
+ httpRequest, err := buildHttpRequest(request, queue.signer)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if queue.config.UserAgent != "" {
|
|
|
+ httpRequest.Header.Set("User-Agent", queue.config.UserAgent)
|
|
|
+ }
|
|
|
+ var httpResponse *http.Response
|
|
|
+ for retryTimes := 0; retryTimes <= queue.config.MaxRetryTime; retryTimes++ {
|
|
|
+ httpResponse, err = queue.httpClient.Do(httpRequest)
|
|
|
+
|
|
|
+ //var timeout bool
|
|
|
+ // receive error
|
|
|
+ if err != nil {
|
|
|
+ if !queue.config.AutoRetry {
|
|
|
+ return
|
|
|
+ //} else if timeout = isTimeout(err); !timeout {
|
|
|
+ // // if not timeout error, return
|
|
|
+ // return
|
|
|
+ } else if retryTimes >= queue.config.MaxRetryTime {
|
|
|
+ // timeout but reached the max retry times, return
|
|
|
+ timeoutErrorMsg := fmt.Sprintf(errors.TimeoutErrorMessage, strconv.Itoa(retryTimes+1), strconv.Itoa(retryTimes+1))
|
|
|
+ err = errors.NewClientError(errors.TimeoutErrorCode, timeoutErrorMsg, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // if status code >= 500 or timeout, will trigger retry
|
|
|
+ if queue.config.AutoRetry && (err != nil || isServerError(httpResponse)) {
|
|
|
+ // rewrite signatureNonce and signature
|
|
|
+ httpRequest, err = buildHttpRequest(request, queue.signer)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ err = responses.Unmarshal(response, httpResponse, request.GetAcceptFormat())
|
|
|
+ // wrap server errors
|
|
|
+ if serverErr, ok := err.(*errors.ServerError); ok {
|
|
|
+ var wrapInfo = map[string]string{}
|
|
|
+ wrapInfo["StringToSign"] = request.GetStringToSign()
|
|
|
+ err = errors.WrapServerError(serverErr, wrapInfo)
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func isServerError(httpResponse *http.Response) bool {
|
|
|
+ return httpResponse.StatusCode >= http.StatusInternalServerError
|
|
|
+}
|
|
|
+
|
|
|
+func buildHttpRequest(request requests.AcsRequest, singer auth.Signer) (httpRequest *http.Request, err error) {
|
|
|
+ err = signMnsRoaRequest(request, singer)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ requestMethod := request.GetMethod()
|
|
|
+ requestUrl := request.BuildUrl()
|
|
|
+ body := request.GetBodyReader()
|
|
|
+ httpRequest, err = http.NewRequest(requestMethod, requestUrl, body)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for key, value := range request.GetHeaders() {
|
|
|
+ httpRequest.Header[key] = []string{value}
|
|
|
+ }
|
|
|
+ // host is a special case
|
|
|
+ if host, containsHost := request.GetHeaders()["Host"]; containsHost {
|
|
|
+ httpRequest.Host = host
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func (queue *Queue) Shutdown() {
|
|
|
+ queue.signer.Shutdown()
|
|
|
+ queue.isRunning = false
|
|
|
+}
|