| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- package mns
- import (
- "fmt"
- "github.com/aliyun/alibaba-cloud-sdk-go/sdk"
- "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth"
- "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials"
- "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/signers"
- "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors"
- "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
- "github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses"
- "net/http"
- "strconv"
- )
- type Queue struct {
- credential *credentials.StsTokenCredential
- httpClient *http.Client
- isRunning bool
- config *sdk.Config
- signer auth.Signer
- }
- func NewClientWithStsToken(regionId, stsAccessKeyId, stsAccessKeySecret, stsToken string) (queue *Queue, err error) {
- queue = &Queue{}
- err = queue.InitWithStsToken(regionId, stsAccessKeyId, stsAccessKeySecret, stsToken)
- return
- }
- func (queue *Queue) InitWithStsToken(regionId, stsAccessKeyId, stsAccessKeySecret, stsToken string) (err error) {
- credential := &credentials.StsTokenCredential{
- AccessKeyId: stsAccessKeyId,
- AccessKeySecret: stsAccessKeySecret,
- AccessKeyStsToken: stsToken,
- }
- queue.signer = signers.NewStsTokenSigner(credential)
- config := queue.InitClientConfig()
- return queue.InitWithOptions(config, credential)
- }
- func (queue *Queue) InitClientConfig() (config *sdk.Config) {
- if queue.config != nil {
- return queue.config
- } else {
- return sdk.NewConfig()
- }
- }
- func (queue *Queue) InitWithOptions(config *sdk.Config, credential auth.Credential) (err error) {
- queue.isRunning = true
- queue.config = config
- if err != nil {
- return
- }
- queue.httpClient = &http.Client{}
- if config.HttpTransport != nil {
- queue.httpClient.Transport = config.HttpTransport
- }
- if config.Timeout > 0 {
- queue.httpClient.Timeout = config.Timeout
- }
- return
- }
- func (queue *Queue) DoActionWithSigner(request requests.AcsRequest, response responses.AcsResponse) (err error) {
- // add clientVersion
- request.GetHeaders()["x-sdk-core-version"] = sdk.Version
- if request.GetScheme() == "" {
- request.SetScheme("HTTP")
- }
- // init request params
- err = requests.InitParams(request)
- if err != nil {
- return
- }
- // signature
- httpRequest, err := buildHttpRequest(request, queue.signer)
- if err != nil {
- return
- }
- if queue.config.UserAgent != "" {
- httpRequest.Header.Set("User-Agent", queue.config.UserAgent)
- }
- var httpResponse *http.Response
- for retryTimes := 0; retryTimes <= queue.config.MaxRetryTime; retryTimes++ {
- httpResponse, err = queue.httpClient.Do(httpRequest)
- //var timeout bool
- // receive error
- if err != nil {
- if !queue.config.AutoRetry {
- return
- //} else if timeout = isTimeout(err); !timeout {
- // // if not timeout error, return
- // return
- } else if retryTimes >= queue.config.MaxRetryTime {
- // timeout but reached the max retry times, return
- timeoutErrorMsg := fmt.Sprintf(errors.TimeoutErrorMessage, strconv.Itoa(retryTimes+1), strconv.Itoa(retryTimes+1))
- err = errors.NewClientError(errors.TimeoutErrorCode, timeoutErrorMsg, err)
- return
- }
- }
- // if status code >= 500 or timeout, will trigger retry
- if queue.config.AutoRetry && (err != nil || isServerError(httpResponse)) {
- // rewrite signatureNonce and signature
- httpRequest, err = buildHttpRequest(request, queue.signer)
- if err != nil {
- return
- }
- continue
- }
- break
- }
- err = responses.Unmarshal(response, httpResponse, request.GetAcceptFormat())
- // wrap server errors
- if serverErr, ok := err.(*errors.ServerError); ok {
- var wrapInfo = map[string]string{}
- wrapInfo["StringToSign"] = request.GetStringToSign()
- err = errors.WrapServerError(serverErr, wrapInfo)
- }
- return
- }
- func isServerError(httpResponse *http.Response) bool {
- return httpResponse.StatusCode >= http.StatusInternalServerError
- }
- func buildHttpRequest(request requests.AcsRequest, singer auth.Signer) (httpRequest *http.Request, err error) {
- err = signMnsRoaRequest(request, singer)
- if err != nil {
- return
- }
- requestMethod := request.GetMethod()
- requestUrl := request.BuildUrl()
- body := request.GetBodyReader()
- httpRequest, err = http.NewRequest(requestMethod, requestUrl, body)
- if err != nil {
- return
- }
- for key, value := range request.GetHeaders() {
- httpRequest.Header[key] = []string{value}
- }
- // host is a special case
- if host, containsHost := request.GetHeaders()["Host"]; containsHost {
- httpRequest.Host = host
- }
- return
- }
- func (queue *Queue) Shutdown() {
- queue.isRunning = false
- }
|