queue.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package mns
  2. import (
  3. "fmt"
  4. "github.com/aliyun/alibaba-cloud-sdk-go/sdk"
  5. "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth"
  6. "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials"
  7. "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/signers"
  8. "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors"
  9. "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
  10. "github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses"
  11. "net/http"
  12. "strconv"
  13. )
  14. type Queue struct {
  15. credential *credentials.StsTokenCredential
  16. httpClient *http.Client
  17. isRunning bool
  18. config *sdk.Config
  19. signer auth.Signer
  20. }
  21. func NewClientWithStsToken(regionId, stsAccessKeyId, stsAccessKeySecret, stsToken string) (queue *Queue, err error) {
  22. queue = &Queue{}
  23. err = queue.InitWithStsToken(regionId, stsAccessKeyId, stsAccessKeySecret, stsToken)
  24. return
  25. }
  26. func (queue *Queue) InitWithStsToken(regionId, stsAccessKeyId, stsAccessKeySecret, stsToken string) (err error) {
  27. credential := &credentials.StsTokenCredential{
  28. AccessKeyId: stsAccessKeyId,
  29. AccessKeySecret: stsAccessKeySecret,
  30. AccessKeyStsToken: stsToken,
  31. }
  32. queue.signer = signers.NewStsTokenSigner(credential)
  33. config := queue.InitClientConfig()
  34. return queue.InitWithOptions(config, credential)
  35. }
  36. func (queue *Queue) InitClientConfig() (config *sdk.Config) {
  37. if queue.config != nil {
  38. return queue.config
  39. } else {
  40. return sdk.NewConfig()
  41. }
  42. }
  43. func (queue *Queue) InitWithOptions(config *sdk.Config, credential auth.Credential) (err error) {
  44. queue.isRunning = true
  45. queue.config = config
  46. if err != nil {
  47. return
  48. }
  49. queue.httpClient = &http.Client{}
  50. if config.HttpTransport != nil {
  51. queue.httpClient.Transport = config.HttpTransport
  52. }
  53. if config.Timeout > 0 {
  54. queue.httpClient.Timeout = config.Timeout
  55. }
  56. return
  57. }
  58. func (queue *Queue) DoActionWithSigner(request requests.AcsRequest, response responses.AcsResponse) (err error) {
  59. // add clientVersion
  60. request.GetHeaders()["x-sdk-core-version"] = sdk.Version
  61. if request.GetScheme() == "" {
  62. request.SetScheme("HTTP")
  63. }
  64. // init request params
  65. err = requests.InitParams(request)
  66. if err != nil {
  67. return
  68. }
  69. // signature
  70. httpRequest, err := buildHttpRequest(request, queue.signer)
  71. if err != nil {
  72. return
  73. }
  74. if queue.config.UserAgent != "" {
  75. httpRequest.Header.Set("User-Agent", queue.config.UserAgent)
  76. }
  77. var httpResponse *http.Response
  78. for retryTimes := 0; retryTimes <= queue.config.MaxRetryTime; retryTimes++ {
  79. httpResponse, err = queue.httpClient.Do(httpRequest)
  80. //var timeout bool
  81. // receive error
  82. if err != nil {
  83. if !queue.config.AutoRetry {
  84. return
  85. //} else if timeout = isTimeout(err); !timeout {
  86. // // if not timeout error, return
  87. // return
  88. } else if retryTimes >= queue.config.MaxRetryTime {
  89. // timeout but reached the max retry times, return
  90. timeoutErrorMsg := fmt.Sprintf(errors.TimeoutErrorMessage, strconv.Itoa(retryTimes+1), strconv.Itoa(retryTimes+1))
  91. err = errors.NewClientError(errors.TimeoutErrorCode, timeoutErrorMsg, err)
  92. return
  93. }
  94. }
  95. // if status code >= 500 or timeout, will trigger retry
  96. if queue.config.AutoRetry && (err != nil || isServerError(httpResponse)) {
  97. // rewrite signatureNonce and signature
  98. httpRequest, err = buildHttpRequest(request, queue.signer)
  99. if err != nil {
  100. return
  101. }
  102. continue
  103. }
  104. break
  105. }
  106. err = responses.Unmarshal(response, httpResponse, request.GetAcceptFormat())
  107. // wrap server errors
  108. if serverErr, ok := err.(*errors.ServerError); ok {
  109. var wrapInfo = map[string]string{}
  110. wrapInfo["StringToSign"] = request.GetStringToSign()
  111. err = errors.WrapServerError(serverErr, wrapInfo)
  112. }
  113. return
  114. }
  115. func isServerError(httpResponse *http.Response) bool {
  116. return httpResponse.StatusCode >= http.StatusInternalServerError
  117. }
  118. func buildHttpRequest(request requests.AcsRequest, singer auth.Signer) (httpRequest *http.Request, err error) {
  119. err = signMnsRoaRequest(request, singer)
  120. if err != nil {
  121. return
  122. }
  123. requestMethod := request.GetMethod()
  124. requestUrl := request.BuildUrl()
  125. body := request.GetBodyReader()
  126. httpRequest, err = http.NewRequest(requestMethod, requestUrl, body)
  127. if err != nil {
  128. return
  129. }
  130. for key, value := range request.GetHeaders() {
  131. httpRequest.Header[key] = []string{value}
  132. }
  133. // host is a special case
  134. if host, containsHost := request.GetHeaders()["Host"]; containsHost {
  135. httpRequest.Host = host
  136. }
  137. return
  138. }
  139. func (queue *Queue) Shutdown() {
  140. queue.isRunning = false
  141. }