conn.go 7.9 KB


  1. package oss
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/base64"
  6. "encoding/xml"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net"
  11. "net/http"
  12. "net/url"
  13. "os"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. // Conn oss conn
  19. type Conn struct {
  20. config *Config
  21. url *urlMaker
  22. }
  23. // Response Http response from oss
  24. type Response struct {
  25. statusCode int
  26. headers http.Header
  27. body io.ReadCloser
  28. }
  29. // Do 处理请求,返回响应结果。
  30. func (conn Conn) Do(method, bucketName, objectName, urlParams, subResource string,
  31. headers map[string]string, data io.Reader) (*Response, error) {
  32. uri := conn.url.getURL(bucketName, objectName, urlParams)
  33. resource := conn.url.getResource(bucketName, objectName, subResource)
  34. return conn.doRequest(method, uri, resource, headers, data)
  35. }
  36. func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource string,
  37. headers map[string]string, data io.Reader) (*Response, error) {
  38. httpTimeOut := conn.config.HTTPTimeout
  39. method = strings.ToUpper(method)
  40. uri.Opaque = uri.Path
  41. req := &http.Request{
  42. Method: method,
  43. URL: uri,
  44. Proto: "HTTP/1.1",
  45. ProtoMajor: 1,
  46. ProtoMinor: 1,
  47. Header: make(http.Header),
  48. Host: uri.Host,
  49. }
  50. conn.handleBody(req, data)
  51. date := time.Now().UTC().Format(http.TimeFormat)
  52. req.Header.Set(HTTPHeaderDate, date)
  53. req.Header.Set(HTTPHeaderHost, conn.config.Endpoint)
  54. req.Header.Set(HTTPHeaderUserAgent, conn.config.UserAgent)
  55. if conn.config.SecurityToken != "" {
  56. req.Header.Set(HTTPHeaderOssSecurityToken, conn.config.SecurityToken)
  57. }
  58. if headers != nil {
  59. for k, v := range headers {
  60. req.Header.Set(k, v)
  61. }
  62. }
  63. conn.signHeader(req, canonicalizedResource)
  64. timeoutClient := &http.Client{Transport: &http.Transport{
  65. Dial: func(netw, addr string) (net.Conn, error) {
  66. conn, err := net.DialTimeout(netw, addr, httpTimeOut.ConnectTimeout)
  67. if err != nil {
  68. return nil, err
  69. }
  70. return newTimeoutConn(conn, httpTimeOut.ReadWriteTimeout, httpTimeOut.LongTimeout), nil
  71. },
  72. ResponseHeaderTimeout: httpTimeOut.HeaderTimeout,
  73. MaxIdleConnsPerHost: 2000,
  74. }}
  75. resp, err := timeoutClient.Do(req)
  76. if err != nil {
  77. return nil, err
  78. }
  79. return conn.handleResponse(resp)
  80. }
  81. // handle request body
  82. func (conn Conn) handleBody(req *http.Request, body io.Reader) {
  83. rc, ok := body.(io.ReadCloser)
  84. if !ok && body != nil {
  85. rc = ioutil.NopCloser(body)
  86. }
  87. req.Body = rc
  88. switch v := body.(type) {
  89. case *bytes.Buffer:
  90. req.ContentLength = int64(v.Len())
  91. case *bytes.Reader:
  92. req.ContentLength = int64(v.Len())
  93. case *strings.Reader:
  94. req.ContentLength = int64(v.Len())
  95. case *os.File:
  96. req.ContentLength = tryGetFileSize(v)
  97. }
  98. req.Header.Set(HTTPHeaderContentLength, strconv.FormatInt(req.ContentLength, 10))
  99. // md5
  100. if req.Body != nil {
  101. buf, _ := ioutil.ReadAll(req.Body)
  102. req.Body = ioutil.NopCloser(bytes.NewReader(buf))
  103. sum := md5.Sum(buf)
  104. b64 := base64.StdEncoding.EncodeToString(sum[:])
  105. req.Header.Set(HTTPHeaderContentMD5, b64)
  106. }
  107. }
  108. func tryGetFileSize(f *os.File) int64 {
  109. fInfo, _ := f.Stat()
  110. return fInfo.Size()
  111. }
  112. // handle response
  113. func (conn Conn) handleResponse(resp *http.Response) (*Response, error) {
  114. statusCode := resp.StatusCode
  115. if statusCode >= 400 && statusCode <= 505 {
  116. // 4xx and 5xx indicate that the operation has error occurred
  117. var respBody []byte
  118. respBody, err := readResponseBody(resp)
  119. if err != nil {
  120. return nil, err
  121. }
  122. if len(respBody) == 0 {
  123. // no error in response body
  124. err = fmt.Errorf("oss: service returned without a response body (%s)", resp.Status)
  125. } else {
  126. // response contains storage service error object, unmarshal
  127. srvErr, errIn := serviceErrFromXML(respBody, resp.StatusCode,
  128. resp.Header.Get(HTTPHeaderOssRequestID))
  129. if err != nil { // error unmarshaling the error response
  130. err = errIn
  131. }
  132. err = srvErr
  133. }
  134. return &Response{
  135. statusCode: resp.StatusCode,
  136. headers: resp.Header,
  137. body: ioutil.NopCloser(bytes.NewReader(respBody)), // restore the body//
  138. }, err
  139. } else if statusCode >= 300 && statusCode <= 307 {
  140. // oss use 3xx, but response has no body
  141. err := fmt.Errorf("oss: service returned %d,%s", resp.StatusCode, resp.Status)
  142. return &Response{
  143. statusCode: resp.StatusCode,
  144. headers: resp.Header,
  145. body: resp.Body,
  146. }, err
  147. }
  148. // 2xx, successful
  149. return &Response{
  150. statusCode: resp.StatusCode,
  151. headers: resp.Header,
  152. body: resp.Body,
  153. }, nil
  154. }
  155. func readResponseBody(resp *http.Response) ([]byte, error) {
  156. defer resp.Body.Close()
  157. out, err := ioutil.ReadAll(resp.Body)
  158. if err == io.EOF {
  159. err = nil
  160. }
  161. return out, err
  162. }
  163. func serviceErrFromXML(body []byte, statusCode int, requestID string) (ServiceError, error) {
  164. var storageErr ServiceError
  165. if err := xml.Unmarshal(body, &storageErr); err != nil {
  166. return storageErr, err
  167. }
  168. storageErr.StatusCode = statusCode
  169. storageErr.RequestID = requestID
  170. return storageErr, nil
  171. }
  172. func xmlUnmarshal(body io.Reader, v interface{}) error {
  173. data, err := ioutil.ReadAll(body)
  174. if err != nil {
  175. return err
  176. }
  177. return xml.Unmarshal(data, v)
  178. }
  179. // Handle http timeout
  180. type timeoutConn struct {
  181. conn net.Conn
  182. timeout time.Duration
  183. longTimeout time.Duration
  184. }
  185. func newTimeoutConn(conn net.Conn, timeout time.Duration, longTimeout time.Duration) *timeoutConn {
  186. conn.SetReadDeadline(time.Now().Add(longTimeout))
  187. return &timeoutConn{
  188. conn: conn,
  189. timeout: timeout,
  190. longTimeout: longTimeout,
  191. }
  192. }
  193. func (c *timeoutConn) Read(b []byte) (n int, err error) {
  194. c.SetReadDeadline(time.Now().Add(c.timeout))
  195. n, err = c.conn.Read(b)
  196. c.SetReadDeadline(time.Now().Add(c.longTimeout))
  197. return n, err
  198. }
  199. func (c *timeoutConn) Write(b []byte) (n int, err error) {
  200. c.SetWriteDeadline(time.Now().Add(c.timeout))
  201. n, err = c.conn.Write(b)
  202. c.SetReadDeadline(time.Now().Add(c.longTimeout))
  203. return n, err
  204. }
  205. func (c *timeoutConn) Close() error {
  206. return c.conn.Close()
  207. }
  208. func (c *timeoutConn) LocalAddr() net.Addr {
  209. return c.conn.LocalAddr()
  210. }
  211. func (c *timeoutConn) RemoteAddr() net.Addr {
  212. return c.conn.RemoteAddr()
  213. }
  214. func (c *timeoutConn) SetDeadline(t time.Time) error {
  215. return c.conn.SetDeadline(t)
  216. }
  217. func (c *timeoutConn) SetReadDeadline(t time.Time) error {
  218. return c.conn.SetReadDeadline(t)
  219. }
  220. func (c *timeoutConn) SetWriteDeadline(t time.Time) error {
  221. return c.conn.SetWriteDeadline(t)
  222. }
  223. // UrlMaker - build url and resource
  224. const (
  225. urlTypeCname = 1
  226. urlTypeIP = 2
  227. urlTypeAliyun = 3
  228. )
  229. type urlMaker struct {
  230. Scheme string // http or https
  231. NetLoc string // host or ip
  232. Type int // 1 CNAME 2 IP 3 ALIYUN
  233. }
  234. // Parse endpoint
  235. func (um *urlMaker) Init(endpoint string, isCname bool) {
  236. if strings.HasPrefix(endpoint, "http://") {
  237. um.Scheme = "http"
  238. um.NetLoc = endpoint[len("http://"):]
  239. } else if strings.HasPrefix(endpoint, "https://") {
  240. um.Scheme = "https"
  241. um.NetLoc = endpoint[len("https://"):]
  242. } else {
  243. um.Scheme = "http"
  244. um.NetLoc = endpoint
  245. }
  246. host, _, err := net.SplitHostPort(um.NetLoc)
  247. if err != nil {
  248. host = um.NetLoc
  249. }
  250. ip := net.ParseIP(host)
  251. if ip != nil {
  252. um.Type = urlTypeIP
  253. } else if isCname {
  254. um.Type = urlTypeCname
  255. } else {
  256. um.Type = urlTypeAliyun
  257. }
  258. }
  259. // Build URL
  260. func (um urlMaker) getURL(bucket, object, params string) *url.URL {
  261. var host = ""
  262. var path = ""
  263. object = url.QueryEscape(object)
  264. if um.Type == urlTypeCname {
  265. host = um.NetLoc
  266. path = "/" + object
  267. } else if um.Type == urlTypeIP {
  268. if bucket == "" {
  269. host = um.NetLoc
  270. path = "/"
  271. } else {
  272. host = um.NetLoc
  273. path = fmt.Sprintf("/%s/%s", bucket, object)
  274. }
  275. } else {
  276. if bucket == "" {
  277. host = um.NetLoc
  278. path = "/"
  279. } else {
  280. host = bucket + "." + um.NetLoc
  281. path = "/" + object
  282. }
  283. }
  284. uri := &url.URL{
  285. Scheme: um.Scheme,
  286. Host: host,
  287. Path: path,
  288. RawQuery: params,
  289. }
  290. return uri
  291. }
  292. // Canonicalized Resource
  293. func (um urlMaker) getResource(bucketName, objectName, subResource string) string {
  294. if subResource != "" {
  295. subResource = "?" + subResource
  296. }
  297. if bucketName == "" {
  298. return fmt.Sprintf("/%s%s", bucketName, subResource)
  299. }
  300. return fmt.Sprintf("/%s/%s%s", bucketName, objectName, subResource)
  301. }