select_csv_objcet_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. package oss
  2. import (
  3. "io/ioutil"
  4. "os"
  5. "strconv"
  6. "strings"
  7. "io"
  8. . "gopkg.in/check.v1"
  9. )
  10. type OssSelectCsvSuite struct {
  11. client *Client
  12. bucket *Bucket
  13. }
  14. var _ = Suite(&OssSelectCsvSuite{})
  15. func (s *OssSelectCsvSuite) SetUpSuite(c *C) {
  16. client, err := New(endpoint, accessID, accessKey)
  17. c.Assert(err, IsNil)
  18. s.client = client
  19. s.client.Config.LogLevel = Error // Debug
  20. // s.client.Config.Timeout = 5
  21. err = s.client.CreateBucket(bucketName)
  22. c.Assert(err, IsNil)
  23. bucket, err := s.client.Bucket(bucketName)
  24. c.Assert(err, IsNil)
  25. s.bucket = bucket
  26. testLogger.Println("test select csv started")
  27. }
  28. func (s *OssSelectCsvSuite) TearDownSuite(c *C) {
  29. // Delete objects
  30. marker := Marker("")
  31. for {
  32. lor, err := s.bucket.ListObjects(marker)
  33. c.Assert(err, IsNil)
  34. for _, object := range lor.Objects {
  35. err = s.bucket.DeleteObject(object.Key)
  36. c.Assert(err, IsNil)
  37. }
  38. marker = Marker(lor.NextMarker)
  39. if !lor.IsTruncated {
  40. break
  41. }
  42. }
  43. err := s.client.DeleteBucket(bucketName)
  44. c.Assert(err, IsNil)
  45. testLogger.Println("test select csv completed")
  46. }
  47. func (s *OssSelectCsvSuite) SetUpTest(c *C) {
  48. testLogger.Println("test func", c.TestName(), "start")
  49. }
  50. func (s *OssSelectCsvSuite) TearDownTest(c *C) {
  51. testLogger.Println("test func", c.TestName(), "succeed")
  52. }
  53. // TestCreateSelectObjectMeta
  54. func (s *OssSelectCsvSuite) TestCreateSelectCsvObjectMeta(c *C) {
  55. key := "sample_data.csv"
  56. localCsvFile := "../sample/sample_data.csv"
  57. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  58. c.Assert(err, IsNil)
  59. csvMeta := CsvMetaRequest{}
  60. var bo bool
  61. csvMeta.OverwriteIfExists = &bo
  62. res, err := s.bucket.CreateSelectCsvObjectMeta(key, csvMeta)
  63. c.Assert(err, IsNil)
  64. l, err := readCsvLine(localCsvFile)
  65. c.Assert(err, IsNil)
  66. c.Assert(res.RowsCount, Equals, int64(l))
  67. bo = true
  68. csvMeta.OverwriteIfExists = &bo
  69. csvMeta.InputSerialization.CSV.RecordDelimiter = "\n"
  70. csvMeta.InputSerialization.CSV.FieldDelimiter = ","
  71. csvMeta.InputSerialization.CSV.QuoteCharacter = "\""
  72. res, err = s.bucket.CreateSelectCsvObjectMeta(key, csvMeta)
  73. c.Assert(err, IsNil)
  74. c.Assert(res.RowsCount, Equals, int64(l))
  75. err = s.bucket.DeleteObject(key)
  76. c.Assert(err, IsNil)
  77. }
  78. func (s *OssSelectCsvSuite) TestSelectCsvObjectIsEmpty(c *C) {
  79. key := "sample_data.csv"
  80. localCsvFile := "../sample/sample_data.csv"
  81. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  82. c.Assert(err, IsNil)
  83. csvMeta := CsvMetaRequest{}
  84. _, err = s.bucket.CreateSelectCsvObjectMeta(key, csvMeta)
  85. c.Assert(err, IsNil)
  86. selReq := SelectRequest{}
  87. selReq.Expression = "select Year, StateAbbr, CityName, PopulationCount from ossobject where CityName != ''"
  88. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  89. body, err := s.bucket.SelectObject(key, selReq)
  90. c.Assert(err, IsNil)
  91. defer body.Close()
  92. p := make([]byte, 512)
  93. n, err := body.Read(p)
  94. c.Assert(err, IsNil)
  95. c.Assert(n, Equals, 512)
  96. p1 := make([]byte, 3)
  97. _, err = body.Read(p1)
  98. c.Assert(err, IsNil)
  99. rets, err := ioutil.ReadAll(body)
  100. c.Assert(err, IsNil)
  101. str, err := readCsvIsEmpty(localCsvFile)
  102. c.Assert(err, IsNil)
  103. c.Assert(string(p)+string(p1)+string(rets), Equals, str)
  104. err = s.bucket.DeleteObject(key)
  105. c.Assert(err, IsNil)
  106. }
  107. func (s *OssSelectCsvSuite) TestSelectObjectIntoFile(c *C) {
  108. var bo bool = true
  109. key := "sample_data.csv"
  110. localCsvFile := "../sample/sample_data.csv"
  111. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  112. c.Assert(err, IsNil)
  113. csvMeta := CsvMetaRequest{
  114. InputSerialization: InputSerialization {
  115. CSV: CSV {
  116. RecordDelimiter: "\n",
  117. FieldDelimiter: ",",
  118. QuoteCharacter: "\"",
  119. },
  120. },
  121. OverwriteIfExists: &bo,
  122. }
  123. res, err := s.bucket.CreateSelectCsvObjectMeta(key, csvMeta)
  124. c.Assert(err, IsNil)
  125. l, err := readCsvLine(localCsvFile)
  126. c.Assert(err, IsNil)
  127. c.Assert(res.RowsCount, Equals, int64(l))
  128. selReq := SelectRequest{
  129. Expression:"select * from ossobject",
  130. InputSerializationSelect: InputSerializationSelect {
  131. CsvBodyInput :CSVSelectInput{
  132. FileHeaderInfo: "None",
  133. CommentCharacter: "#",
  134. RecordDelimiter: "\n",
  135. FieldDelimiter: ",",
  136. QuoteCharacter:"\"",
  137. Range:"",
  138. },
  139. },
  140. }
  141. outfile := "sample_data_out.csv"
  142. err = s.bucket.SelectObjectIntoFile(key, outfile, selReq)
  143. c.Assert(err, IsNil)
  144. fd1, err := os.Open(outfile)
  145. c.Assert(err,IsNil)
  146. defer fd1.Close()
  147. fd2, err := os.Open(localCsvFile)
  148. c.Assert(err,IsNil)
  149. defer fd2.Close()
  150. str1, err := ioutil.ReadAll(fd1)
  151. c.Assert(err,IsNil)
  152. str2 ,err := ioutil.ReadAll(fd2)
  153. c.Assert(err,IsNil)
  154. c.Assert(string(str1), Equals, string(str2))
  155. err = os.Remove(outfile)
  156. c.Assert(err, IsNil)
  157. err = s.bucket.DeleteObject(key)
  158. c.Assert(err, IsNil)
  159. }
  160. func(s *OssSelectCsvSuite) TestSelectCsvObjectRange(c *C) {
  161. key := "sample_data.csv"
  162. localCsvFile := "../sample/sample_data.csv"
  163. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  164. c.Assert(err, IsNil)
  165. csvMeta := CsvMetaRequest{}
  166. _,err = s.bucket.CreateSelectCsvObjectMeta(key, csvMeta)
  167. c.Assert(err, IsNil)
  168. selReq := SelectRequest{}
  169. selReq.Expression = "select Year,StateAbbr, CityName, Short_Question_Text from ossobject"
  170. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  171. selReq.InputSerializationSelect.CsvBodyInput.Range = "0-2"
  172. body, err := s.bucket.SelectObject(key, selReq)
  173. c.Assert(err, IsNil)
  174. defer body.Close()
  175. rets, err := ioutil.ReadAll(body)
  176. str,err := readCsvRange(localCsvFile, 0, 2)
  177. c.Assert(err, IsNil)
  178. c.Assert(string(rets), Equals, str)
  179. err = s.bucket.DeleteObject(key)
  180. c.Assert(err, IsNil)
  181. }
  182. func(s *OssSelectCsvSuite) TestSelectCsvObjectLike(c *C) {
  183. key := "sample_data.csv"
  184. localCsvFile := "../sample/sample_data.csv"
  185. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  186. c.Assert(err, IsNil)
  187. selReq := SelectRequest{}
  188. selReq.Expression = "select Year, StateAbbr, CityName, Short_Question_Text from ossobject where Measure like '%blood pressure%Years'"
  189. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  190. ret,err := s.bucket.SelectObject(key, selReq)
  191. c.Assert(err, IsNil)
  192. defer ret.Close()
  193. ts, err := ioutil.ReadAll(ret)
  194. c.Assert(err, IsNil)
  195. str, err := readCsvLike(localCsvFile)
  196. c.Assert(err, IsNil)
  197. c.Assert(string(ts), Equals, str)
  198. err = s.bucket.DeleteObject(key)
  199. c.Assert(err, IsNil)
  200. }
  201. func(s *OssSelectCsvSuite) TestSelectCsvObjectIntAggregation(c *C) {
  202. key := "sample_data.csv"
  203. localCsvFile := "../sample/sample_data.csv"
  204. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  205. c.Assert(err, IsNil)
  206. selReq := SelectRequest{}
  207. selReq.Expression = `select avg(cast(year as int)), max(cast(year as int)), min(cast(year as int)) from ossobject where year = 2015`
  208. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  209. ret,err := s.bucket.SelectObject(key, selReq)
  210. c.Assert(err, IsNil)
  211. defer ret.Close()
  212. ts, err := ioutil.ReadAll(ret)
  213. c.Assert(err, IsNil)
  214. c.Assert(string(ts), Equals, "2015,2015,2015\n")
  215. err = s.bucket.DeleteObject(key)
  216. c.Assert(err, IsNil)
  217. }
  218. func(s *OssSelectCsvSuite) TestSelectCsvObjectFloatAggregation(c *C) {
  219. key := "sample_data.csv"
  220. localCsvFile := "../sample/sample_data.csv"
  221. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  222. c.Assert(err, IsNil)
  223. selReq := SelectRequest{}
  224. selReq.Expression = `select avg(cast(data_value as double)), max(cast(data_value as double)), sum(cast(data_value as double)) from ossobject`
  225. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  226. ret,err := s.bucket.SelectObject(key, selReq)
  227. c.Assert(err, IsNil)
  228. defer ret.Close()
  229. ts, err := ioutil.ReadAll(ret)
  230. strR := string(ts)
  231. c.Assert(err, IsNil)
  232. avg, max, sum , err := readCsvFloatAgg(localCsvFile)
  233. c.Assert(err, IsNil)
  234. s1 := strconv.FormatFloat(avg, 'f', 5, 32) + ","
  235. s1 += strconv.FormatFloat(max, 'f', 5, 32) + ","
  236. s1 += strconv.FormatFloat(sum, 'f', 5, 32) + ","
  237. retS := ""
  238. for _, v := range strings.Split(strR[:len(strR)-1], ",") {
  239. vv, err := strconv.ParseFloat(v, 64)
  240. c.Assert(err, IsNil)
  241. retS += strconv.FormatFloat(vv, 'f', 5, 32) + ","
  242. }
  243. c.Assert(s1, Equals, retS)
  244. err = s.bucket.DeleteObject(key)
  245. c.Assert(err, IsNil)
  246. }
  247. func(s *OssSelectCsvSuite) TestSelectCsvObjectConcat(c *C) {
  248. key := "sample_data.csv"
  249. localCsvFile := "../sample/sample_data.csv"
  250. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  251. c.Assert(err, IsNil)
  252. selReq := SelectRequest{}
  253. selReq.Expression = `select Year,StateAbbr, CityName, Short_Question_Text from ossobject where (data_value || data_value_unit) = '14.8%'`
  254. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  255. ret,err := s.bucket.SelectObject(key, selReq)
  256. c.Assert(err, IsNil)
  257. defer ret.Close()
  258. ts, err := ioutil.ReadAll(ret)
  259. c.Assert(err, IsNil)
  260. str, err := readCsvConcat(localCsvFile)
  261. c.Assert(err, IsNil)
  262. c.Assert(string(ts), Equals, str)
  263. err = s.bucket.DeleteObject(key)
  264. c.Assert(err, IsNil)
  265. }
  266. func (s *OssSelectCsvSuite) TestSelectCsvObjectComplicateConcat(c *C) {
  267. key := "sample_data.csv"
  268. localCsvFile := "../sample/sample_data.csv"
  269. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  270. c.Assert(err, IsNil)
  271. selReq := SelectRequest{}
  272. selReq.Expression = `
  273. select
  274. Year,StateAbbr, CityName, Short_Question_Text, data_value,
  275. data_value_unit, category, high_confidence_limit
  276. from
  277. ossobject
  278. where
  279. data_value > 14.8 and
  280. data_value_unit = '%' or
  281. Measure like '%18 Years' and
  282. Category = 'Unhealthy Behaviors' or
  283. high_confidence_limit > 70.0 `
  284. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  285. ret,err := s.bucket.SelectObject(key, selReq)
  286. c.Assert(err, IsNil)
  287. defer ret.Close()
  288. ts, err := ioutil.ReadAll(ret)
  289. c.Assert(err, IsNil)
  290. str, err := readCsvComplicateCondition(localCsvFile)
  291. c.Assert(err, IsNil)
  292. c.Assert(string(ts), Equals, str)
  293. err = s.bucket.DeleteObject(key)
  294. c.Assert(err, IsNil)
  295. }
  296. func (s *OssSelectCsvSuite) TestSelectCsvObjectInvalidSql(c *C) {
  297. key := "sample_data.csv"
  298. localCsvFile := "../sample/sample_data.csv"
  299. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  300. c.Assert(err, IsNil)
  301. selReq := SelectRequest{}
  302. selReq.Expression = `select * from ossobject where avg(cast(year as int)) > 2016`
  303. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  304. _, err = s.bucket.SelectObject(key, selReq)
  305. c.Assert(err, NotNil)
  306. selReq.Expression = ``
  307. _, err = s.bucket.SelectObject(key, selReq)
  308. c.Assert(err, NotNil)
  309. selReq.Expression = `select year || CityName from ossobject`
  310. _, err = s.bucket.SelectObject(key, selReq)
  311. c.Assert(err, NotNil)
  312. selReq.Expression = `select * from ossobject group by CityName`
  313. _, err = s.bucket.SelectObject(key, selReq)
  314. c.Assert(err, NotNil)
  315. selReq.Expression = `select * from ossobject order by _1`
  316. _, err = s.bucket.SelectObject(key, selReq)
  317. c.Assert(err, NotNil)
  318. selReq.Expression = `select * from ossobject oss join s3object s3 on oss.CityName = s3.CityName`
  319. _, err = s.bucket.SelectObject(key, selReq)
  320. c.Assert(err, NotNil)
  321. selReq.Expression = `select _1 from ossobject`
  322. ret, err := s.bucket.SelectObject(key, selReq)
  323. c.Assert(err, IsNil)
  324. defer ret.Close()
  325. _, err = ioutil.ReadAll(ret)
  326. c.Assert(err, IsNil)
  327. err = s.bucket.DeleteObject(key)
  328. c.Assert(err, IsNil)
  329. }
  330. func (s *OssSelectCsvSuite) TestSelectCsvObjectWithOutputDelimiters(c *C) {
  331. key := "sample_data.csv"
  332. content := "abc,def\n"
  333. err := s.bucket.PutObject(key, strings.NewReader(content))
  334. c.Assert(err, IsNil)
  335. selReq := SelectRequest{}
  336. selReq.Expression = `select _1, _2 from ossobject `
  337. selReq.OutputSerializationSelect.CsvBodyOutput.RecordDelimiter = "\r\n"
  338. selReq.OutputSerializationSelect.CsvBodyOutput.FieldDelimiter = "|"
  339. ret,err := s.bucket.SelectObject(key, selReq)
  340. c.Assert(err, IsNil)
  341. defer ret.Close()
  342. ts, err := ioutil.ReadAll(ret)
  343. c.Assert(err, IsNil)
  344. c.Assert(string(ts), Equals, "abc|def\r\n")
  345. err = s.bucket.DeleteObject(key)
  346. c.Assert(err, IsNil)
  347. }
  348. func (s *OssSelectCsvSuite) TestSelectCsvObjectWithCrc(c *C) {
  349. key := "sample_data.csv"
  350. content := "abc,def\n"
  351. err := s.bucket.PutObject(key, strings.NewReader(content))
  352. c.Assert(err, IsNil)
  353. selReq := SelectRequest{}
  354. selReq.Expression = `select * from ossobject`
  355. bo := true
  356. selReq.OutputSerializationSelect.EnablePayloadCrc = &bo
  357. ret,err := s.bucket.SelectObject(key, selReq)
  358. c.Assert(err, IsNil)
  359. defer ret.Close()
  360. ts, err := ioutil.ReadAll(ret)
  361. c.Assert(err, IsNil)
  362. c.Assert(string(ts), Equals, content)
  363. err = s.bucket.DeleteObject(key)
  364. c.Assert(err, IsNil)
  365. }
  366. func (s *OssSelectCsvSuite) TestSelectCsvObjectWithSkipPartialData(c *C) {
  367. key := "sample_data.csv"
  368. content := "abc,def\nefg\n"
  369. err := s.bucket.PutObject(key, strings.NewReader(content))
  370. c.Assert(err, IsNil)
  371. selReq := SelectRequest{}
  372. selReq.Expression = `select _1, _2 from ossobject`
  373. bo := true
  374. selReq.SelectOptions.SkipPartialDataRecord = &bo
  375. ret,err := s.bucket.SelectObject(key, selReq)
  376. c.Assert(err, IsNil)
  377. defer ret.Close()
  378. ts, err := ioutil.ReadAll(ret)
  379. c.Assert(err, IsNil)
  380. c.Assert(string(ts), Equals, "abc,def\n")
  381. err = s.bucket.DeleteObject(key)
  382. c.Assert(err, IsNil)
  383. }
  384. func (s *OssSelectCsvSuite) TestSelectCsvObjectWithOutputRaw(c *C) {
  385. key := "sample_data.csv"
  386. content := "abc,def\n"
  387. err := s.bucket.PutObject(key, strings.NewReader(content))
  388. c.Assert(err, IsNil)
  389. selReq := SelectRequest{}
  390. selReq.Expression = `select _1 from ossobject`
  391. bo := true
  392. selReq.OutputSerializationSelect.OutputRawData = &bo
  393. ret,err := s.bucket.SelectObject(key, selReq)
  394. c.Assert(err, IsNil)
  395. defer ret.Close()
  396. ts, err := ioutil.ReadAll(ret)
  397. c.Assert(err, IsNil)
  398. c.Assert(string(ts), Equals, "abc\n")
  399. err = s.bucket.DeleteObject(key)
  400. c.Assert(err, IsNil)
  401. }
  402. func (s *OssSelectCsvSuite) TestSelectCsvObjectWithKeepColumns(c *C) {
  403. key := "sample_data.csv"
  404. content := "abc,def\n"
  405. err := s.bucket.PutObject(key, strings.NewReader(content))
  406. c.Assert(err, IsNil)
  407. selReq := SelectRequest{}
  408. selReq.Expression = `select _1 from ossobject`
  409. bo := true
  410. selReq.OutputSerializationSelect.KeepAllColumns = &bo
  411. ret,err := s.bucket.SelectObject(key, selReq)
  412. c.Assert(err, IsNil)
  413. defer ret.Close()
  414. ts, err := ioutil.ReadAll(ret)
  415. c.Assert(err, IsNil)
  416. c.Assert(string(ts), Equals, "abc,\n")
  417. err = s.bucket.DeleteObject(key)
  418. c.Assert(err, IsNil)
  419. }
  420. func (s *OssSelectCsvSuite) TestSelectCsvObjectWithOutputHeader(c *C) {
  421. key := "sample_data.csv"
  422. content := "name,job\nabc,def\n"
  423. err := s.bucket.PutObject(key, strings.NewReader(content))
  424. c.Assert(err, IsNil)
  425. selReq := SelectRequest{}
  426. selReq.Expression = `select name from ossobject`
  427. bo := true
  428. selReq.OutputSerializationSelect.OutputHeader = &bo
  429. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  430. ret,err := s.bucket.SelectObject(key, selReq)
  431. c.Assert(err, IsNil)
  432. defer ret.Close()
  433. ts, err := ioutil.ReadAll(ret)
  434. c.Assert(err, IsNil)
  435. c.Assert(string(ts), Equals, "name\nabc\n")
  436. err = s.bucket.DeleteObject(key)
  437. c.Assert(err, IsNil)
  438. }
  439. func (s *OssSelectCsvSuite) TestSelectCsvObjectRead(c *C) {
  440. key := "sample_data.csv"
  441. content := "name,job\nabc,def\n"
  442. err := s.bucket.PutObject(key, strings.NewReader(content))
  443. c.Assert(err, IsNil)
  444. selReq := SelectRequest{}
  445. selReq.Expression = `select name from ossobject`
  446. bo := true
  447. selReq.OutputSerializationSelect.OutputHeader = &bo
  448. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  449. selReq.OutputSerializationSelect.EnablePayloadCrc = &bo
  450. ret,err := s.bucket.SelectObject(key, selReq)
  451. c.Assert(err, IsNil)
  452. defer ret.Close()
  453. // case 1: read length > data length
  454. p := make([]byte, 512)
  455. n, err := ret.Read(p[:20])
  456. if err != nil && err != io.EOF {
  457. c.Assert(err, IsNil)
  458. }
  459. c.Assert(string(p[:n]), Equals, "name\nabc\n")
  460. ts, err := ioutil.ReadAll(ret)
  461. c.Assert(err, IsNil)
  462. c.Assert(string(ts), Equals, "")
  463. // case 2: read length = data length
  464. ret,err = s.bucket.SelectObject(key, selReq)
  465. c.Assert(err, IsNil)
  466. defer ret.Close()
  467. n, err = ret.Read(p[:9])
  468. if err != nil && err != io.EOF {
  469. c.Assert(err, IsNil)
  470. }
  471. c.Assert(string(p[:n]), Equals, "name\nabc\n")
  472. ts, err = ioutil.ReadAll(ret)
  473. c.Assert(err, IsNil)
  474. c.Assert(string(ts), Equals, "")
  475. // case 3: read length > one frame length and read length < two frame, (this data = 2 * frame length)
  476. ret,err = s.bucket.SelectObject(key, selReq)
  477. c.Assert(err, IsNil)
  478. defer ret.Close()
  479. n, err = ret.Read(p[:7])
  480. if err != nil && err != io.EOF {
  481. c.Assert(err, IsNil)
  482. }
  483. c.Assert(string(p[:n]), Equals, "name\nab")
  484. ts, err = ioutil.ReadAll(ret)
  485. c.Assert(err, IsNil)
  486. c.Assert(string(ts), Equals, "c\n")
  487. // case 4: read length = a frame length (this data = 2 * frame length)
  488. ret,err = s.bucket.SelectObject(key, selReq)
  489. c.Assert(err, IsNil)
  490. defer ret.Close()
  491. n, err = ret.Read(p[:5])
  492. if err != nil && err != io.EOF {
  493. c.Assert(err, IsNil)
  494. }
  495. c.Assert(string(p[:n]), Equals, "name\n")
  496. ts, err = ioutil.ReadAll(ret)
  497. c.Assert(err, IsNil)
  498. c.Assert(string(ts), Equals, "abc\n")
  499. // case 5: read length < a frame length (this data = 2 * frame length)
  500. ret,err = s.bucket.SelectObject(key, selReq)
  501. c.Assert(err, IsNil)
  502. defer ret.Close()
  503. n, err = ret.Read(p[:3])
  504. if err != nil && err != io.EOF {
  505. c.Assert(err, IsNil)
  506. }
  507. c.Assert(string(p[:n]), Equals, "nam")
  508. ts, err = ioutil.ReadAll(ret)
  509. c.Assert(err, IsNil)
  510. c.Assert(string(ts), Equals, "e\nabc\n")
  511. err = s.bucket.DeleteObject(key)
  512. c.Assert(err, IsNil)
  513. }
  514. // OssProgressListener is the progress listener
  515. type OssSelectProgressListener struct {
  516. }
  517. // ProgressChanged handles progress event
  518. func (listener *OssSelectProgressListener) ProgressChanged(event *ProgressEvent) {
  519. switch event.EventType {
  520. case TransferStartedEvent:
  521. testLogger.Printf("Transfer Started.\n")
  522. case TransferDataEvent:
  523. testLogger.Printf("Transfer Data, This time consumedBytes: %d \n", event.ConsumedBytes)
  524. case TransferCompletedEvent:
  525. testLogger.Printf("Transfer Completed, This time consumedBytes: %d.\n", event.ConsumedBytes)
  526. case TransferFailedEvent:
  527. testLogger.Printf("Transfer Failed, This time consumedBytes: %d.\n", event.ConsumedBytes)
  528. default:
  529. }
  530. }
  531. func(s *OssSelectCsvSuite) TestSelectCsvObjectConcatProgress(c *C) {
  532. key := "sample_data.csv"
  533. localCsvFile := "../sample/sample_data.csv"
  534. err := s.bucket.PutObjectFromFile(key, localCsvFile)
  535. c.Assert(err, IsNil)
  536. selReq := SelectRequest{}
  537. selReq.Expression = `select Year,StateAbbr, CityName, Short_Question_Text from ossobject where (data_value || data_value_unit) = '14.8%'`
  538. selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
  539. ret,err := s.bucket.SelectObject(key, selReq, Progress(&OssSelectProgressListener{}))
  540. c.Assert(err, IsNil)
  541. defer ret.Close()
  542. ts, err := ioutil.ReadAll(ret)
  543. c.Assert(err, IsNil)
  544. str, err := readCsvConcat(localCsvFile)
  545. c.Assert(err, IsNil)
  546. c.Assert(string(ts), Equals, str)
  547. err = s.bucket.DeleteObject(key)
  548. c.Assert(err, IsNil)
  549. }