conn.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. package oss
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/base64"
  6. "encoding/xml"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net"
  11. "net/http"
  12. "net/url"
  13. "os"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. // Conn oss conn
  19. type Conn struct {
  20. config *Config
  21. url *urlMaker
  22. }
  23. // Response Http response from oss
  24. type Response struct {
  25. statusCode int
  26. headers http.Header
  27. body io.ReadCloser
  28. }
  29. // Do 处理请求,返回响应结果。
  30. func (conn Conn) Do(method, bucketName, objectName, urlParams, subResource string,
  31. headers map[string]string, data io.Reader) (*Response, error) {
  32. uri := conn.url.getURL(bucketName, objectName, urlParams)
  33. resource := conn.url.getResource(bucketName, objectName, subResource)
  34. return conn.doRequest(method, uri, resource, headers, data)
  35. }
  36. func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource string,
  37. headers map[string]string, data io.Reader) (*Response, error) {
  38. httpTimeOut := conn.config.HTTPTimeout
  39. method = strings.ToUpper(method)
  40. if !conn.config.IsUseProxy {
  41. uri.Opaque = uri.Path
  42. }
  43. req := &http.Request{
  44. Method: method,
  45. URL: uri,
  46. Proto: "HTTP/1.1",
  47. ProtoMajor: 1,
  48. ProtoMinor: 1,
  49. Header: make(http.Header),
  50. Host: uri.Host,
  51. }
  52. conn.handleBody(req, data)
  53. date := time.Now().UTC().Format(http.TimeFormat)
  54. req.Header.Set(HTTPHeaderDate, date)
  55. req.Header.Set(HTTPHeaderHost, conn.config.Endpoint)
  56. req.Header.Set(HTTPHeaderUserAgent, conn.config.UserAgent)
  57. if conn.config.SecurityToken != "" {
  58. req.Header.Set(HTTPHeaderOssSecurityToken, conn.config.SecurityToken)
  59. }
  60. if headers != nil {
  61. for k, v := range headers {
  62. req.Header.Set(k, v)
  63. }
  64. }
  65. conn.signHeader(req, canonicalizedResource)
  66. var transport *http.Transport
  67. if conn.config.IsUseProxy {
  68. // proxy
  69. proxyURL, err := url.Parse(conn.config.ProxyHost)
  70. if err != nil {
  71. return nil, err
  72. }
  73. transport = &http.Transport{
  74. Proxy: http.ProxyURL(proxyURL),
  75. Dial: func(netw, addr string) (net.Conn, error) {
  76. conn, err := net.DialTimeout(netw, addr, httpTimeOut.ConnectTimeout)
  77. if err != nil {
  78. return nil, err
  79. }
  80. return newTimeoutConn(conn, httpTimeOut.ReadWriteTimeout, httpTimeOut.LongTimeout), nil
  81. },
  82. ResponseHeaderTimeout: httpTimeOut.HeaderTimeout,
  83. MaxIdleConnsPerHost: 2000,
  84. }
  85. if conn.config.IsAuthProxy {
  86. auth := conn.config.ProxyUser + ":" + conn.config.ProxyPassword
  87. basic := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
  88. req.Header.Set("Proxy-Authorization", basic)
  89. }
  90. } else {
  91. // no proxy
  92. transport = &http.Transport{
  93. Dial: func(netw, addr string) (net.Conn, error) {
  94. conn, err := net.DialTimeout(netw, addr, httpTimeOut.ConnectTimeout)
  95. if err != nil {
  96. return nil, err
  97. }
  98. return newTimeoutConn(conn, httpTimeOut.ReadWriteTimeout, httpTimeOut.LongTimeout), nil
  99. },
  100. ResponseHeaderTimeout: httpTimeOut.HeaderTimeout,
  101. MaxIdleConnsPerHost: 2000,
  102. }
  103. }
  104. timeoutClient := &http.Client{Transport: transport}
  105. resp, err := timeoutClient.Do(req)
  106. if err != nil {
  107. return nil, err
  108. }
  109. return conn.handleResponse(resp)
  110. }
  111. // handle request body
  112. func (conn Conn) handleBody(req *http.Request, body io.Reader) {
  113. rc, ok := body.(io.ReadCloser)
  114. if !ok && body != nil {
  115. rc = ioutil.NopCloser(body)
  116. }
  117. req.Body = rc
  118. switch v := body.(type) {
  119. case *bytes.Buffer:
  120. req.ContentLength = int64(v.Len())
  121. case *bytes.Reader:
  122. req.ContentLength = int64(v.Len())
  123. case *strings.Reader:
  124. req.ContentLength = int64(v.Len())
  125. case *os.File:
  126. req.ContentLength = tryGetFileSize(v)
  127. }
  128. req.Header.Set(HTTPHeaderContentLength, strconv.FormatInt(req.ContentLength, 10))
  129. // md5
  130. if req.Body != nil && conn.config.IsEnableMD5 {
  131. buf, _ := ioutil.ReadAll(req.Body)
  132. req.Body = ioutil.NopCloser(bytes.NewReader(buf))
  133. sum := md5.Sum(buf)
  134. b64 := base64.StdEncoding.EncodeToString(sum[:])
  135. req.Header.Set(HTTPHeaderContentMD5, b64)
  136. }
  137. }
  138. func tryGetFileSize(f *os.File) int64 {
  139. fInfo, _ := f.Stat()
  140. return fInfo.Size()
  141. }
  142. // handle response
  143. func (conn Conn) handleResponse(resp *http.Response) (*Response, error) {
  144. statusCode := resp.StatusCode
  145. if statusCode >= 400 && statusCode <= 505 {
  146. // 4xx and 5xx indicate that the operation has error occurred
  147. var respBody []byte
  148. respBody, err := readResponseBody(resp)
  149. if err != nil {
  150. return nil, err
  151. }
  152. if len(respBody) == 0 {
  153. // no error in response body
  154. err = fmt.Errorf("oss: service returned without a response body (%s)", resp.Status)
  155. } else {
  156. // response contains storage service error object, unmarshal
  157. srvErr, errIn := serviceErrFromXML(respBody, resp.StatusCode,
  158. resp.Header.Get(HTTPHeaderOssRequestID))
  159. if err != nil { // error unmarshaling the error response
  160. err = errIn
  161. }
  162. err = srvErr
  163. }
  164. return &Response{
  165. statusCode: resp.StatusCode,
  166. headers: resp.Header,
  167. body: ioutil.NopCloser(bytes.NewReader(respBody)), // restore the body//
  168. }, err
  169. } else if statusCode >= 300 && statusCode <= 307 {
  170. // oss use 3xx, but response has no body
  171. err := fmt.Errorf("oss: service returned %d,%s", resp.StatusCode, resp.Status)
  172. return &Response{
  173. statusCode: resp.StatusCode,
  174. headers: resp.Header,
  175. body: resp.Body,
  176. }, err
  177. }
  178. // 2xx, successful
  179. return &Response{
  180. statusCode: resp.StatusCode,
  181. headers: resp.Header,
  182. body: resp.Body,
  183. }, nil
  184. }
  185. func readResponseBody(resp *http.Response) ([]byte, error) {
  186. defer resp.Body.Close()
  187. out, err := ioutil.ReadAll(resp.Body)
  188. if err == io.EOF {
  189. err = nil
  190. }
  191. return out, err
  192. }
  193. func serviceErrFromXML(body []byte, statusCode int, requestID string) (ServiceError, error) {
  194. var storageErr ServiceError
  195. if err := xml.Unmarshal(body, &storageErr); err != nil {
  196. return storageErr, err
  197. }
  198. storageErr.StatusCode = statusCode
  199. storageErr.RequestID = requestID
  200. storageErr.RawMessage = string(body)
  201. return storageErr, nil
  202. }
  203. func xmlUnmarshal(body io.Reader, v interface{}) error {
  204. data, err := ioutil.ReadAll(body)
  205. if err != nil {
  206. return err
  207. }
  208. return xml.Unmarshal(data, v)
  209. }
  210. // Handle http timeout
  211. type timeoutConn struct {
  212. conn net.Conn
  213. timeout time.Duration
  214. longTimeout time.Duration
  215. }
  216. func newTimeoutConn(conn net.Conn, timeout time.Duration, longTimeout time.Duration) *timeoutConn {
  217. conn.SetReadDeadline(time.Now().Add(longTimeout))
  218. return &timeoutConn{
  219. conn: conn,
  220. timeout: timeout,
  221. longTimeout: longTimeout,
  222. }
  223. }
  224. func (c *timeoutConn) Read(b []byte) (n int, err error) {
  225. c.SetReadDeadline(time.Now().Add(c.timeout))
  226. n, err = c.conn.Read(b)
  227. c.SetReadDeadline(time.Now().Add(c.longTimeout))
  228. return n, err
  229. }
  230. func (c *timeoutConn) Write(b []byte) (n int, err error) {
  231. c.SetWriteDeadline(time.Now().Add(c.timeout))
  232. n, err = c.conn.Write(b)
  233. c.SetReadDeadline(time.Now().Add(c.longTimeout))
  234. return n, err
  235. }
  236. func (c *timeoutConn) Close() error {
  237. return c.conn.Close()
  238. }
  239. func (c *timeoutConn) LocalAddr() net.Addr {
  240. return c.conn.LocalAddr()
  241. }
  242. func (c *timeoutConn) RemoteAddr() net.Addr {
  243. return c.conn.RemoteAddr()
  244. }
  245. func (c *timeoutConn) SetDeadline(t time.Time) error {
  246. return c.conn.SetDeadline(t)
  247. }
  248. func (c *timeoutConn) SetReadDeadline(t time.Time) error {
  249. return c.conn.SetReadDeadline(t)
  250. }
  251. func (c *timeoutConn) SetWriteDeadline(t time.Time) error {
  252. return c.conn.SetWriteDeadline(t)
  253. }
  254. // UrlMaker - build url and resource
  255. const (
  256. urlTypeCname = 1
  257. urlTypeIP = 2
  258. urlTypeAliyun = 3
  259. )
  260. type urlMaker struct {
  261. Scheme string // http or https
  262. NetLoc string // host or ip
  263. Type int // 1 CNAME 2 IP 3 ALIYUN
  264. IsProxy bool // proxy
  265. }
  266. // Parse endpoint
  267. func (um *urlMaker) Init(endpoint string, isCname bool, isProxy bool) {
  268. if strings.HasPrefix(endpoint, "http://") {
  269. um.Scheme = "http"
  270. um.NetLoc = endpoint[len("http://"):]
  271. } else if strings.HasPrefix(endpoint, "https://") {
  272. um.Scheme = "https"
  273. um.NetLoc = endpoint[len("https://"):]
  274. } else {
  275. um.Scheme = "http"
  276. um.NetLoc = endpoint
  277. }
  278. host, _, err := net.SplitHostPort(um.NetLoc)
  279. if err != nil {
  280. host = um.NetLoc
  281. }
  282. ip := net.ParseIP(host)
  283. if ip != nil {
  284. um.Type = urlTypeIP
  285. } else if isCname {
  286. um.Type = urlTypeCname
  287. } else {
  288. um.Type = urlTypeAliyun
  289. }
  290. um.IsProxy = isProxy
  291. }
  292. // Build URL
  293. func (um urlMaker) getURL(bucket, object, params string) *url.URL {
  294. var host = ""
  295. var path = ""
  296. if !um.IsProxy {
  297. object = url.QueryEscape(object)
  298. }
  299. if um.Type == urlTypeCname {
  300. host = um.NetLoc
  301. path = "/" + object
  302. } else if um.Type == urlTypeIP {
  303. if bucket == "" {
  304. host = um.NetLoc
  305. path = "/"
  306. } else {
  307. host = um.NetLoc
  308. path = fmt.Sprintf("/%s/%s", bucket, object)
  309. }
  310. } else {
  311. if bucket == "" {
  312. host = um.NetLoc
  313. path = "/"
  314. } else {
  315. host = bucket + "." + um.NetLoc
  316. path = "/" + object
  317. }
  318. }
  319. uri := &url.URL{
  320. Scheme: um.Scheme,
  321. Host: host,
  322. Path: path,
  323. RawQuery: params,
  324. }
  325. return uri
  326. }
  327. // Canonicalized Resource
  328. func (um urlMaker) getResource(bucketName, objectName, subResource string) string {
  329. if subResource != "" {
  330. subResource = "?" + subResource
  331. }
  332. if bucketName == "" {
  333. return fmt.Sprintf("/%s%s", bucketName, subResource)
  334. }
  335. return fmt.Sprintf("/%s/%s%s", bucketName, objectName, subResource)
  336. }