فهرست منبع

optimization for weak net

taowei.wtw 5 سال پیش
والد
کامیت
e1bdab76d4
5فایلهای تغییر یافته به همراه32 افزوده شده و 9 حذف شده
  1. 1 1
      oss/bucket.go
  2. 13 4
      oss/download.go
  3. 5 1
      oss/transport_1_6.go
  4. 5 1
      oss/transport_1_7.go
  5. 8 2
      oss/upload.go

+ 1 - 1
oss/bucket.go

@@ -1190,7 +1190,7 @@ func (bucket Bucket) do(method, objectName string, params map[string]interface{}
 
 	// get response header
 	respHeader, _ := FindOption(options, responseHeader, nil)
-	if respHeader != nil {
+	if respHeader != nil && resp != nil {
 		pRespHeader := respHeader.(*http.Header)
 		*pRespHeader = resp.Headers
 	}

+ 13 - 4
oss/download.go

@@ -14,6 +14,7 @@ import (
 	"os"
 	"path/filepath"
 	"strconv"
+	"time"
 )
 
 // DownloadFile downloads files with multipart download.
@@ -102,10 +103,12 @@ func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, res
 		// Resolve options
 		r := Range(part.Start, part.End)
 		p := Progress(&defaultDownloadProgressListener{})
-		opts := make([]Option, len(arg.options)+2)
+
+		var respHeader http.Header
+		opts := make([]Option, len(arg.options)+3)
 		// Append orderly, can not be reversed!
 		opts = append(opts, arg.options...)
-		opts = append(opts, r, p)
+		opts = append(opts, r, p, GetResponseHeader(&respHeader))
 
 		rd, err := arg.bucket.GetObject(arg.key, opts...)
 		if err != nil {
@@ -141,8 +144,11 @@ func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, res
 			break
 		}
 
+		startT := time.Now().UnixNano() / 1000 / 1000 / 1000
 		_, err = io.Copy(fd, rd)
+		endT := time.Now().UnixNano() / 1000 / 1000 / 1000
 		if err != nil {
+			arg.bucket.Client.Config.WriteLog(Debug, "download part error,cost:%d second,part number:%d,request id:%s,error:%s.\n", endT-startT, part.Index, GetRequestId(respHeader), err.Error())
 			fd.Close()
 			failed <- err
 			break
@@ -458,8 +464,11 @@ func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKe
 }
 
 func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
-	os.Remove(cpFilePath)
-	return os.Rename(downFilepath, cp.FilePath)
+	err := os.Rename(downFilepath, cp.FilePath)
+	if err != nil {
+		return err
+	}
+	return os.Remove(cpFilePath)
 }
 
 // downloadFileWithCp downloads files with checkpoint.

+ 5 - 1
oss/transport_1_6.go

@@ -5,6 +5,7 @@ package oss
 import (
 	"net"
 	"net/http"
+	"time"
 )
 
 func newTransport(conn *Conn, config *Config) *http.Transport {
@@ -13,7 +14,10 @@ func newTransport(conn *Conn, config *Config) *http.Transport {
 	// New Transport
 	transport := &http.Transport{
 		Dial: func(netw, addr string) (net.Conn, error) {
-			d := net.Dialer{Timeout: httpTimeOut.ConnectTimeout}
+			d := net.Dialer{
+				Timeout:   httpTimeOut.ConnectTimeout,
+				KeepAlive: 30 * time.Second,
+			}
 			if config.LocalAddr != nil {
 				d.LocalAddr = config.LocalAddr
 			}

+ 5 - 1
oss/transport_1_7.go

@@ -5,6 +5,7 @@ package oss
 import (
 	"net"
 	"net/http"
+	"time"
 )
 
 func newTransport(conn *Conn, config *Config) *http.Transport {
@@ -13,7 +14,10 @@ func newTransport(conn *Conn, config *Config) *http.Transport {
 	// New Transport
 	transport := &http.Transport{
 		Dial: func(netw, addr string) (net.Conn, error) {
-			d := net.Dialer{Timeout: httpTimeOut.ConnectTimeout}
+			d := net.Dialer{
+				Timeout:   httpTimeOut.ConnectTimeout,
+				KeepAlive: 30 * time.Second,
+			}
 			if config.LocalAddr != nil {
 				d.LocalAddr = config.LocalAddr
 			}

+ 8 - 2
oss/upload.go

@@ -8,6 +8,7 @@ import (
 	"errors"
 	"fmt"
 	"io/ioutil"
+	"net/http"
 	"os"
 	"path/filepath"
 	"time"
@@ -149,14 +150,19 @@ func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadP
 			failed <- err
 			break
 		}
+		var respHeader http.Header
 		p := Progress(&defaultUploadProgressListener{})
-		opts := make([]Option, len(arg.options)+1)
+		opts := make([]Option, len(arg.options)+2)
 		opts = append(opts, arg.options...)
 
 		// use defaultUploadProgressListener
-		opts = append(opts, p)
+		opts = append(opts, p, GetResponseHeader(&respHeader))
+
+		startT := time.Now().UnixNano() / 1000 / 1000 / 1000
 		part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number, opts...)
+		endT := time.Now().UnixNano() / 1000 / 1000 / 1000
 		if err != nil {
+			arg.bucket.Client.Config.WriteLog(Debug, "upload part error,cost:%d second,part number:%d,request id:%s,error:%s\n", endT-startT, chunk.Number, GetRequestId(respHeader), err.Error())
 			failed <- err
 			break
 		}