Browse Source

download file checked by crc

鸣镝 8 năm trước cách đây
mục cha
commit
140c0dd497
7 tập tin đã thay đổi với 243 bổ sung30 xóa
  1. 1 1
      oss/const.go
  2. 79 0
      oss/crc.go
  3. 33 0
      oss/crc_test.go
  4. 87 29
      oss/download.go
  5. 24 0
      oss/download_test.go
  6. 7 0
      oss/error.go
  7. 12 0
      oss/error_test.go

+ 1 - 1
oss/const.go

@@ -128,5 +128,5 @@ const (
 
 	CheckpointFileSuffix = ".cp" // Checkpoint文件后缀
 
-	Version = "1.6.0" // Go sdk版本
+	Version = "1.7.0" // Go sdk版本
 )

+ 79 - 0
oss/crc.go

@@ -42,3 +42,82 @@ func (d *digest) Sum(in []byte) []byte {
 	s := d.Sum64()
 	return append(in, byte(s>>56), byte(s>>48), byte(s>>40), byte(s>>32), byte(s>>24), byte(s>>16), byte(s>>8), byte(s))
 }
+
+// gf2Dim dimension of GF(2) vectors (length of CRC)
+const gf2Dim int = 64
+
+func gf2MatrixTimes(mat []uint64, vec uint64) uint64 {
+	var sum uint64
+	for i := 0; vec != 0; i++ {
+		if vec&1 != 0 {
+			sum ^= mat[i]
+		}
+
+		vec >>= 1
+	}
+	return sum
+}
+
+func gf2MatrixSquare(square []uint64, mat []uint64) {
+	for n := 0; n < gf2Dim; n++ {
+		square[n] = gf2MatrixTimes(mat, mat[n])
+	}
+}
+
+// CRC64Combine combine crc64
+func CRC64Combine(crc1 uint64, crc2 uint64, len2 uint64) uint64 {
+	var even [gf2Dim]uint64 // even-power-of-two zeros operator
+	var odd [gf2Dim]uint64  // odd-power-of-two zeros operator
+
+	// Degenerate case
+	if len2 == 0 {
+		return crc1
+	}
+
+	// Put operator for one zero bit in odd
+	odd[0] = crc64.ECMA // CRC64 polynomial
+	var row uint64 = 1
+	for n := 1; n < gf2Dim; n++ {
+		odd[n] = row
+		row <<= 1
+	}
+
+	// Put operator for two zero bits in even
+	gf2MatrixSquare(even[:], odd[:])
+
+	// Put operator for four zero bits in odd
+	gf2MatrixSquare(odd[:], even[:])
+
+	// Apply len2 zeros to crc1, first square will put the operator for one zero byte, eight zero bits, in even
+	for {
+		// Apply zeros operator for this bit of len2
+		gf2MatrixSquare(even[:], odd[:])
+
+		if len2&1 != 0 {
+			crc1 = gf2MatrixTimes(even[:], crc1)
+		}
+
+		len2 >>= 1
+
+		// If no more bits set, then done
+		if len2 == 0 {
+			break
+		}
+
+		// Another iteration of the loop with odd and even swapped
+		gf2MatrixSquare(odd[:], even[:])
+		if len2&1 != 0 {
+			crc1 = gf2MatrixTimes(odd[:], crc1)
+		}
+		len2 >>= 1
+
+		// If no more bits set, then done
+		if len2 == 0 {
+			break
+		}
+	}
+
+	// Return combined crc
+	crc1 ^= crc2
+	return crc1
+}

+ 33 - 0
oss/crc_test.go

@@ -126,6 +126,39 @@ func (s *OssCrcSuite) TestCRCGolden(c *C) {
 	}
 }
 
+// testCRC64Combine test crc64 on vector[0..pos] which should have CRC-64 crc.
+// Also test CRC64Combine on vector[] split in two.
+func testCRC64Combine(c *C, str string, pos int, crc uint64) {
+	tabECMA := crc64.MakeTable(crc64.ECMA)
+
+	// test crc64
+	hash := crc64.New(tabECMA)
+	io.WriteString(hash, str)
+	crc1 := hash.Sum64()
+	c.Assert(crc1, Equals, crc)
+
+	// test crc64 combine
+	hash = crc64.New(tabECMA)
+	io.WriteString(hash, str[0:pos])
+	crc1 = hash.Sum64()
+
+	hash = crc64.New(tabECMA)
+	io.WriteString(hash, str[pos:len(str)])
+	crc2 := hash.Sum64()
+
+	crc1 = CRC64Combine(crc1, crc2, uint64(len(str)-pos))
+	c.Assert(crc1, Equals, crc)
+}
+
+// TestCRCGolden 测试CRC64Combine
+func (s *OssCrcSuite) TestCRCCombine(c *C) {
+	str := "123456789"
+	testCRC64Combine(c, str, (len(str)+1)>>1, 0x995DC9BBDF1939FA)
+
+	str = "This is a test of the emergency broadcast system."
+	testCRC64Combine(c, str, (len(str)+1)>>1, 0x27DB187FC15BBC72)
+}
+
 // TestEnableCRCAndMD5 开启MD5和CRC校验
 func (s *OssCrcSuite) TestEnableCRCAndMD5(c *C) {
 	objectName := objectNamePrefix + "tecam"

+ 87 - 29
oss/download.go

@@ -5,6 +5,8 @@ import (
 	"encoding/base64"
 	"encoding/json"
 	"errors"
+	"hash"
+	"hash/crc64"
 	"io"
 	"io/ioutil"
 	"os"
@@ -23,7 +25,7 @@ import (
 //
 func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
 	if partSize < 1 {
-		return errors.New("oss: part size smaller than 1.")
+		return errors.New("oss: part size smaller than 1")
 	}
 
 	cpConf, err := getCpConfig(options, filePath)
@@ -58,11 +60,12 @@ func getRangeConfig(options []Option) (*unpackedRange, error) {
 
 // 工作协程参数
 type downloadWorkerArg struct {
-	bucket   *Bucket
-	key      string
-	filePath string
-	options  []Option
-	hook     downloadPartHook
+	bucket    *Bucket
+	key       string
+	filePath  string
+	options   []Option
+	hook      downloadPartHook
+	enableCRC bool
 }
 
 // Hook用于测试
@@ -105,6 +108,14 @@ func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, res
 		}
 		defer rd.Close()
 
+		var crcCalc hash.Hash64
+		if arg.enableCRC {
+			crcCalc = crc64.New(crcTable())
+			contentLen := part.End - part.Start + 1
+			rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
+		}
+		defer rd.Close()
+
 		select {
 		case <-die:
 			return
@@ -131,6 +142,10 @@ func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, res
 			break
 		}
 
+		if arg.enableCRC {
+			part.CRC64 = crcCalc.Sum64()
+		}
+
 		fd.Close()
 		results <- part
 	}
@@ -146,23 +161,33 @@ func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
 
 // 下载片
 type downloadPart struct {
-	Index  int   // 片序号,从0开始编号
-	Start  int64 // 片起始位置
-	End    int64 // 片结束位置
-	Offset int64 // 偏移位置
+	Index  int    // 片序号,从0开始编号
+	Start  int64  // 片起始位置
+	End    int64  // 片结束位置
+	Offset int64  // 文件中的偏移位置
+	CRC64  uint64 // 片的校验值
 }
 
 // 文件分片
-func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *unpackedRange) ([]downloadPart, error) {
+func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *unpackedRange) ([]downloadPart, bool, uint64, error) {
 	meta, err := bucket.GetObjectDetailedMeta(objectKey)
 	if err != nil {
-		return nil, err
+		return nil, false, 0, err
 	}
 
 	parts := []downloadPart{}
 	objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
 	if err != nil {
-		return nil, err
+		return nil, false, 0, err
+	}
+
+	enableCRC := false
+	crcVal := (uint64)(0)
+	if bucket.getConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
+		if uRange == nil || (!uRange.hasStart && !uRange.hasEnd) {
+			enableCRC = true
+			crcVal, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 0)
+		}
 	}
 
 	part := downloadPart{}
@@ -173,10 +198,11 @@ func getDownloadParts(bucket *Bucket, objectKey string, partSize int64, uRange *
 		part.Start = offset
 		part.End = GetPartEnd(offset, end, partSize)
 		part.Offset = start
+		part.CRC64 = 0
 		parts = append(parts, part)
 		i++
 	}
-	return parts, nil
+	return parts, enableCRC, crcVal, nil
 }
 
 // 文件大小
@@ -188,6 +214,20 @@ func getObjectBytes(parts []downloadPart) int64 {
 	return ob
 }
 
+// 计算连续分片总的CRC
+func combineCRCInParts(dps []downloadPart) uint64 {
+	if dps == nil || len(dps) == 0 {
+		return 0
+	}
+
+	crc := dps[0].CRC64
+	for i := 1; i < len(dps); i++ {
+		crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
+	}
+
+	return crc
+}
+
 // 并发无断点续传的下载
 func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *unpackedRange) error {
 	tempFilePath := filePath + TempFileSuffix
@@ -201,7 +241,7 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 	fd.Close()
 
 	// 分割文件
-	parts, err := getDownloadParts(&bucket, objectKey, partSize, uRange)
+	parts, enableCRC, expectedCRC, err := getDownloadParts(&bucket, objectKey, partSize, uRange)
 	if err != nil {
 		return err
 	}
@@ -217,7 +257,7 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 	publishProgress(listener, event)
 
 	// 启动工作协程
-	arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
+	arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
 	for w := 1; w <= routines; w++ {
 		go downloadWorker(w, arg, jobs, results, failed, die)
 	}
@@ -227,13 +267,12 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 
 	// 等待分片下载完成
 	completed := 0
-	ps := make([]downloadPart, len(parts))
 	for completed < len(parts) {
 		select {
 		case part := <-results:
 			completed++
-			ps[part.Index] = part
 			completedBytes += (part.End - part.Start + 1)
+			parts[part.Index].CRC64 = part.CRC64
 			event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes)
 			publishProgress(listener, event)
 		case err := <-failed:
@@ -251,6 +290,14 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 	event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes)
 	publishProgress(listener, event)
 
+	if enableCRC {
+		actualCRC := combineCRCInParts(parts)
+		err = checkDownloadCRC(actualCRC, expectedCRC)
+		if err != nil {
+			return err
+		}
+	}
+
 	return os.Rename(tempFilePath, filePath)
 }
 
@@ -259,15 +306,17 @@ func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, op
 const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
 
 type downloadCheckpoint struct {
-	Magic    string         // magic
-	MD5      string         // cp内容的MD5
-	FilePath string         // 本地文件
-	Object   string         // key
-	ObjStat  objectStat     // 文件状态
-	Parts    []downloadPart // 全部分片
-	PartStat []bool         // 分片下载是否完成
-	Start    int64          // 起点
-	End      int64          // 终点
+	Magic     string         // magic
+	MD5       string         // cp内容的MD5
+	FilePath  string         // 本地文件
+	Object    string         // key
+	ObjStat   objectStat     // 文件状态
+	Parts     []downloadPart // 全部分片
+	PartStat  []bool         // 分片下载是否完成
+	Start     int64          // 起点
+	End       int64          // 终点
+	enableCRC bool           // 是否有CRC校验
+	CRC       uint64         // CRC校验值
 }
 
 type objectStat struct {
@@ -398,7 +447,7 @@ func (cp *downloadCheckpoint) prepare(bucket *Bucket, objectKey, filePath string
 	cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
 
 	// parts
-	cp.Parts, err = getDownloadParts(bucket, objectKey, partSize, uRange)
+	cp.Parts, cp.enableCRC, cp.CRC, err = getDownloadParts(bucket, objectKey, partSize, uRange)
 	if err != nil {
 		return err
 	}
@@ -455,7 +504,7 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 	publishProgress(listener, event)
 
 	// 启动工作协程
-	arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker}
+	arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
 	for w := 1; w <= routines; w++ {
 		go downloadWorker(w, arg, jobs, results, failed, die)
 	}
@@ -470,6 +519,7 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 		case part := <-results:
 			completed++
 			dcp.PartStat[part.Index] = true
+			dcp.Parts[part.Index].CRC64 = part.CRC64
 			dcp.dump(cpFilePath)
 			completedBytes += (part.End - part.Start + 1)
 			event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size)
@@ -489,5 +539,13 @@ func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int
 	event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size)
 	publishProgress(listener, event)
 
+	if dcp.enableCRC {
+		actualCRC := combineCRCInParts(dcp.Parts)
+		err = checkDownloadCRC(actualCRC, dcp.CRC)
+		if err != nil {
+			return err
+		}
+	}
+
 	return dcp.complete(cpFilePath, tempFilePath)
 }

+ 24 - 0
oss/download_test.go

@@ -543,6 +543,30 @@ func (s *OssDownloadSuite) TestDownloadWithCheckoutAndRange(c *C) {
 	c.Assert(err, IsNil)
 }
 
+// TestCombineCRCInParts 测试DownloadParts的CRC Combine
+func (s *OssDownloadSuite) TestCombineCRCInDownloadParts(c *C) {
+	crc := combineCRCInParts(nil)
+	c.Assert(crc == 0, Equals, true)
+
+	crc = combineCRCInParts(make([]downloadPart, 0))
+	c.Assert(crc == 0, Equals, true)
+
+	parts := make([]downloadPart, 1)
+	parts[0].CRC64 = 10278880121275185425
+	crc = combineCRCInParts(parts)
+	c.Assert(crc == 10278880121275185425, Equals, true)
+
+	parts = make([]downloadPart, 2)
+	parts[0].CRC64 = 6748440630437108969
+	parts[0].Start = 0
+	parts[0].End = 4
+	parts[1].CRC64 = 10278880121275185425
+	parts[1].Start = 5
+	parts[1].End = 8
+	crc = combineCRCInParts(parts)
+	c.Assert(crc == 11051210869376104954, Equals, true)
+}
+
 func getFileSize(fileName string) (int64, error) {
 	file, err := os.Open(fileName)
 	if err != nil {

+ 7 - 0
oss/error.go

@@ -74,6 +74,13 @@ func (e CRCCheckError) Error() string {
 		e.operation, e.clientCRC, e.serverCRC, e.requestID)
 }
 
+func checkDownloadCRC(clientCRC, serverCRC uint64) error {
+	if clientCRC == serverCRC {
+		return nil
+	}
+	return CRCCheckError{clientCRC, serverCRC, "DownloadFile", ""}
+}
+
 func checkCRC(resp *Response, operation string) error {
 	if resp.Headers.Get(HTTPHeaderOssCRC64) == "" || resp.ClientCRC == resp.ServerCRC {
 		return nil

+ 12 - 0
oss/error_test.go

@@ -70,3 +70,15 @@ func (s *OssErrorSuite) TestCheckCRCCNegative(c *C) {
 	c.Assert(err, NotNil)
 	testLogger.Println("error:", err)
 }
+
+func (s *OssErrorSuite) TestCheckDownloadCRC(c *C) {
+	err := checkDownloadCRC(0xFBF9D9603A6FA020, 0xFBF9D9603A6FA020)
+	c.Assert(err, IsNil)
+
+	err = checkDownloadCRC(0, 0)
+	c.Assert(err, IsNil)
+
+	err = checkDownloadCRC(0xDB6EFFF26AA94946, 0)
+	c.Assert(err, NotNil)
+	testLogger.Println("error:", err)
+}