select_object_type.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package oss
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "hash"
  7. "hash/crc32"
  8. "io"
  9. "net/http"
  10. "time"
  11. )
  12. // The adapter class for Select object's response.
  13. // The response consists of frames. Each frame has the following format:
  14. // Type | Payload Length | Header Checksum | Payload | Payload Checksum
  15. // |<4-->| <--4 bytes------><---4 bytes-------><-n/a-----><--4 bytes--------->
  16. // And we have three kind of frames.
  17. // Data Frame:
  18. // Type:8388609
  19. // Payload: Offset | Data
  20. // <-8 bytes>
  21. // Continuous Frame
  22. // Type:8388612
  23. // Payload: Offset (8-bytes)
  24. // End Frame
  25. // Type:8388613
  26. // Payload: Offset | total scanned bytes | http status code | error message
  27. // <-- 8bytes--><-----8 bytes--------><---4 bytes-------><---variabe--->
  28. // SelectObjectResponse defines HTTP response from OSS SelectObject
  29. type SelectObjectResponse struct {
  30. StatusCode int
  31. Headers http.Header
  32. Body io.ReadCloser
  33. Frame SelectObjectResult
  34. ReadTimeOut uint
  35. ClientCRC32 uint32
  36. ServerCRC32 uint32
  37. WriterForCheckCrc32 hash.Hash32
  38. Finish bool
  39. }
  40. func (sr *SelectObjectResponse) Read(p []byte) (n int, err error) {
  41. n, err = sr.readFrames(p)
  42. return
  43. }
  44. // Close http reponse body
  45. func (sr *SelectObjectResponse) Close() error {
  46. return sr.Body.Close()
  47. }
  48. // PostSelectResult is the request of SelectObject
  49. type PostSelectResult struct {
  50. Response *SelectObjectResponse
  51. }
  52. // readFrames is read Frame
  53. func (sr *SelectObjectResponse) readFrames(p []byte) (int, error) {
  54. var nn int
  55. var err error
  56. var checkValid bool
  57. if sr.Frame.OutputRawData == true {
  58. nn, err = sr.Body.Read(p)
  59. return nn, err
  60. }
  61. if sr.Finish {
  62. return 0, io.EOF
  63. }
  64. for {
  65. // if this Frame is Readed, then not reading Header
  66. if sr.Frame.OpenLine != true {
  67. err = sr.analysisHeader()
  68. if err != nil {
  69. return nn, err
  70. }
  71. }
  72. if sr.Frame.FrameType == DataFrameType {
  73. n, err := sr.analysisData(p[nn:])
  74. if err != nil {
  75. return nn, err
  76. }
  77. nn += n
  78. // if this Frame is readed all data, then empty the Frame to read it with next frame
  79. if sr.Frame.ConsumedBytesLength == sr.Frame.PayloadLength-8 {
  80. checkValid, err = sr.checkPayloadSum()
  81. if err != nil || !checkValid {
  82. return nn, fmt.Errorf("%s", err.Error())
  83. }
  84. sr.emptyFrame()
  85. }
  86. if nn == len(p) {
  87. return nn, nil
  88. }
  89. } else if sr.Frame.FrameType == ContinuousFrameType {
  90. checkValid, err = sr.checkPayloadSum()
  91. if err != nil || !checkValid {
  92. return nn, fmt.Errorf("%s", err.Error())
  93. }
  94. } else if sr.Frame.FrameType == EndFrameType {
  95. err = sr.analysisEndFrame()
  96. if err != nil {
  97. return nn, err
  98. }
  99. checkValid, err = sr.checkPayloadSum()
  100. if checkValid {
  101. sr.Finish = true
  102. }
  103. return nn, err
  104. } else if sr.Frame.FrameType == MetaEndFrameCSVType {
  105. err = sr.analysisMetaEndFrameCSV()
  106. if err != nil {
  107. return nn, err
  108. }
  109. checkValid, err = sr.checkPayloadSum()
  110. if checkValid {
  111. sr.Finish = true
  112. }
  113. return nn, err
  114. } else if sr.Frame.FrameType == MetaEndFrameJSONType {
  115. err = sr.analysisMetaEndFrameJSON()
  116. if err != nil {
  117. return nn, err
  118. }
  119. checkValid, err = sr.checkPayloadSum()
  120. if checkValid {
  121. sr.Finish = true
  122. }
  123. return nn, err
  124. }
  125. }
  126. return nn, nil
  127. }
  128. type chanReadIO struct {
  129. readLen int
  130. err error
  131. }
  132. func (sr *SelectObjectResponse) readLen(p []byte, timeOut time.Duration) (int, error) {
  133. r := sr.Body
  134. ch := make(chan chanReadIO, 1)
  135. defer close(ch)
  136. go func(p []byte) {
  137. var needReadLength int
  138. readChan := chanReadIO{}
  139. needReadLength = len(p)
  140. for {
  141. n, err := r.Read(p[readChan.readLen:needReadLength])
  142. readChan.readLen += n
  143. if err != nil {
  144. readChan.err = err
  145. ch <- readChan
  146. return
  147. }
  148. if readChan.readLen == needReadLength {
  149. break
  150. }
  151. }
  152. ch <- readChan
  153. }(p)
  154. select {
  155. case <-time.After(time.Second * timeOut):
  156. return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", sr.Headers.Get(HTTPHeaderOssRequestID), timeOut, len(p))
  157. case result := <-ch:
  158. return result.readLen, result.err
  159. }
  160. }
  161. // analysisHeader is reading selectObject response body's header
  162. func (sr *SelectObjectResponse) analysisHeader() error {
  163. headFrameByte := make([]byte, 20)
  164. _, err := sr.readLen(headFrameByte, time.Duration(sr.ReadTimeOut))
  165. if err != nil {
  166. return fmt.Errorf("requestId: %s, Read response frame header failure,err:%s", sr.Headers.Get(HTTPHeaderOssRequestID), err.Error())
  167. }
  168. frameTypeByte := headFrameByte[0:4]
  169. sr.Frame.Version = frameTypeByte[0]
  170. frameTypeByte[0] = 0
  171. bytesToInt(frameTypeByte, &sr.Frame.FrameType)
  172. if sr.Frame.FrameType != DataFrameType && sr.Frame.FrameType != ContinuousFrameType &&
  173. sr.Frame.FrameType != EndFrameType && sr.Frame.FrameType != MetaEndFrameCSVType && sr.Frame.FrameType != MetaEndFrameJSONType {
  174. return fmt.Errorf("requestId: %s, Unexpected frame type: %d", sr.Headers.Get(HTTPHeaderOssRequestID), sr.Frame.FrameType)
  175. }
  176. payloadLengthByte := headFrameByte[4:8]
  177. bytesToInt(payloadLengthByte, &sr.Frame.PayloadLength)
  178. headCheckSumByte := headFrameByte[8:12]
  179. bytesToInt(headCheckSumByte, &sr.Frame.HeaderCheckSum)
  180. byteOffset := headFrameByte[12:20]
  181. bytesToInt(byteOffset, &sr.Frame.Offset)
  182. sr.Frame.OpenLine = true
  183. err = sr.writerCheckCrc32(byteOffset)
  184. return err
  185. }
  186. // analysisData is reading the DataFrameType data of selectObject response body
  187. func (sr *SelectObjectResponse) analysisData(p []byte) (int, error) {
  188. var needReadLength int32
  189. lenP := int32(len(p))
  190. restByteLength := sr.Frame.PayloadLength - 8 - sr.Frame.ConsumedBytesLength
  191. if lenP <= restByteLength {
  192. needReadLength = lenP
  193. } else {
  194. needReadLength = restByteLength
  195. }
  196. n, err := sr.readLen(p[:needReadLength], time.Duration(sr.ReadTimeOut))
  197. if err != nil {
  198. return n, fmt.Errorf("read frame data error,%s", err.Error())
  199. }
  200. sr.Frame.ConsumedBytesLength += int32(n)
  201. err = sr.writerCheckCrc32(p[:n])
  202. return n, err
  203. }
  204. // analysisEndFrame is reading the EndFrameType data of selectObject response body
  205. func (sr *SelectObjectResponse) analysisEndFrame() error {
  206. var eF EndFrame
  207. payLoadBytes := make([]byte, sr.Frame.PayloadLength-8)
  208. _, err := sr.readLen(payLoadBytes, time.Duration(sr.ReadTimeOut))
  209. if err != nil {
  210. return fmt.Errorf("read end frame error:%s", err.Error())
  211. }
  212. bytesToInt(payLoadBytes[0:8], &eF.TotalScanned)
  213. bytesToInt(payLoadBytes[8:12], &eF.HTTPStatusCode)
  214. errMsgLength := sr.Frame.PayloadLength - 20
  215. eF.ErrorMsg = string(payLoadBytes[12 : errMsgLength+12])
  216. sr.Frame.EndFrame.TotalScanned = eF.TotalScanned
  217. sr.Frame.EndFrame.HTTPStatusCode = eF.HTTPStatusCode
  218. sr.Frame.EndFrame.ErrorMsg = eF.ErrorMsg
  219. err = sr.writerCheckCrc32(payLoadBytes)
  220. return err
  221. }
  222. // analysisMetaEndFrameCSV is reading the MetaEndFrameCSVType data of selectObject response body
  223. func (sr *SelectObjectResponse) analysisMetaEndFrameCSV() error {
  224. var mCF MetaEndFrameCSV
  225. payLoadBytes := make([]byte, sr.Frame.PayloadLength-8)
  226. _, err := sr.readLen(payLoadBytes, time.Duration(sr.ReadTimeOut))
  227. if err != nil {
  228. return fmt.Errorf("read meta end csv frame error:%s", err.Error())
  229. }
  230. bytesToInt(payLoadBytes[0:8], &mCF.TotalScanned)
  231. bytesToInt(payLoadBytes[8:12], &mCF.Status)
  232. bytesToInt(payLoadBytes[12:16], &mCF.SplitsCount)
  233. bytesToInt(payLoadBytes[16:24], &mCF.RowsCount)
  234. bytesToInt(payLoadBytes[24:28], &mCF.ColumnsCount)
  235. errMsgLength := sr.Frame.PayloadLength - 36
  236. mCF.ErrorMsg = string(payLoadBytes[28 : errMsgLength+28])
  237. sr.Frame.MetaEndFrameCSV.ErrorMsg = mCF.ErrorMsg
  238. sr.Frame.MetaEndFrameCSV.TotalScanned = mCF.TotalScanned
  239. sr.Frame.MetaEndFrameCSV.Status = mCF.Status
  240. sr.Frame.MetaEndFrameCSV.SplitsCount = mCF.SplitsCount
  241. sr.Frame.MetaEndFrameCSV.RowsCount = mCF.RowsCount
  242. sr.Frame.MetaEndFrameCSV.ColumnsCount = mCF.ColumnsCount
  243. err = sr.writerCheckCrc32(payLoadBytes)
  244. return err
  245. }
  246. // analysisMetaEndFrameJSON is reading the MetaEndFrameJSONType data of selectObject response body
  247. func (sr *SelectObjectResponse) analysisMetaEndFrameJSON() error {
  248. var mJF MetaEndFrameJSON
  249. payLoadBytes := make([]byte, sr.Frame.PayloadLength-8)
  250. _, err := sr.readLen(payLoadBytes, time.Duration(sr.ReadTimeOut))
  251. if err != nil {
  252. return fmt.Errorf("read meta end json frame error:%s", err.Error())
  253. }
  254. bytesToInt(payLoadBytes[0:8], &mJF.TotalScanned)
  255. bytesToInt(payLoadBytes[8:12], &mJF.Status)
  256. bytesToInt(payLoadBytes[12:16], &mJF.SplitsCount)
  257. bytesToInt(payLoadBytes[16:24], &mJF.RowsCount)
  258. errMsgLength := sr.Frame.PayloadLength - 32
  259. mJF.ErrorMsg = string(payLoadBytes[24 : errMsgLength+24])
  260. sr.Frame.MetaEndFrameJSON.ErrorMsg = mJF.ErrorMsg
  261. sr.Frame.MetaEndFrameJSON.TotalScanned = mJF.TotalScanned
  262. sr.Frame.MetaEndFrameJSON.Status = mJF.Status
  263. sr.Frame.MetaEndFrameJSON.SplitsCount = mJF.SplitsCount
  264. sr.Frame.MetaEndFrameJSON.RowsCount = mJF.RowsCount
  265. err = sr.writerCheckCrc32(payLoadBytes)
  266. return err
  267. }
  268. func (sr *SelectObjectResponse) checkPayloadSum() (bool, error) {
  269. payLoadChecksumByte := make([]byte, 4)
  270. n, err := sr.readLen(payLoadChecksumByte, time.Duration(sr.ReadTimeOut))
  271. if n == 4 {
  272. bytesToInt(payLoadChecksumByte, &sr.Frame.PayloadChecksum)
  273. sr.ServerCRC32 = sr.Frame.PayloadChecksum
  274. sr.ClientCRC32 = sr.WriterForCheckCrc32.Sum32()
  275. if sr.Frame.EnablePayloadCrc == true && sr.ServerCRC32 != 0 && sr.ServerCRC32 != sr.ClientCRC32 {
  276. return false, fmt.Errorf("RequestId: %s, Unexpected frame type: %d, client %d but server %d",
  277. sr.Headers.Get(HTTPHeaderOssRequestID), sr.Frame.FrameType, sr.ClientCRC32, sr.ServerCRC32)
  278. }
  279. return true, err
  280. }
  281. return false, fmt.Errorf("RequestId:%s, read checksum error:%s", sr.Headers.Get(HTTPHeaderOssRequestID), err.Error())
  282. }
  283. func (sr *SelectObjectResponse) writerCheckCrc32(p []byte) (err error) {
  284. err = nil
  285. if sr.Frame.EnablePayloadCrc == true {
  286. _, err = sr.WriterForCheckCrc32.Write(p)
  287. }
  288. return err
  289. }
  290. // emptyFrame is emptying SelectObjectResponse Frame information
  291. func (sr *SelectObjectResponse) emptyFrame() {
  292. crcCalc := crc32.NewIEEE()
  293. sr.WriterForCheckCrc32 = crcCalc
  294. sr.Finish = false
  295. sr.Frame.ConsumedBytesLength = 0
  296. sr.Frame.OpenLine = false
  297. sr.Frame.Version = byte(0)
  298. sr.Frame.FrameType = 0
  299. sr.Frame.PayloadLength = 0
  300. sr.Frame.HeaderCheckSum = 0
  301. sr.Frame.Offset = 0
  302. sr.Frame.Data = ""
  303. sr.Frame.EndFrame.TotalScanned = 0
  304. sr.Frame.EndFrame.HTTPStatusCode = 0
  305. sr.Frame.EndFrame.ErrorMsg = ""
  306. sr.Frame.MetaEndFrameCSV.TotalScanned = 0
  307. sr.Frame.MetaEndFrameCSV.Status = 0
  308. sr.Frame.MetaEndFrameCSV.SplitsCount = 0
  309. sr.Frame.MetaEndFrameCSV.RowsCount = 0
  310. sr.Frame.MetaEndFrameCSV.ColumnsCount = 0
  311. sr.Frame.MetaEndFrameCSV.ErrorMsg = ""
  312. sr.Frame.MetaEndFrameJSON.TotalScanned = 0
  313. sr.Frame.MetaEndFrameJSON.Status = 0
  314. sr.Frame.MetaEndFrameJSON.SplitsCount = 0
  315. sr.Frame.MetaEndFrameJSON.RowsCount = 0
  316. sr.Frame.MetaEndFrameJSON.ErrorMsg = ""
  317. sr.Frame.PayloadChecksum = 0
  318. }
  319. // bytesToInt byte's array trans to int
  320. func bytesToInt(b []byte, ret interface{}) {
  321. binBuf := bytes.NewBuffer(b)
  322. binary.Read(binBuf, binary.BigEndian, ret)
  323. }