token.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806
  1. package mssql
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "strconv"
  10. "strings"
  11. )
  12. //go:generate stringer -type token
  13. type token byte
  14. // token ids
  15. const (
  16. tokenReturnStatus token = 121 // 0x79
  17. tokenColMetadata token = 129 // 0x81
  18. tokenOrder token = 169 // 0xA9
  19. tokenError token = 170 // 0xAA
  20. tokenInfo token = 171 // 0xAB
  21. tokenReturnValue token = 0xAC
  22. tokenLoginAck token = 173 // 0xad
  23. tokenRow token = 209 // 0xd1
  24. tokenNbcRow token = 210 // 0xd2
  25. tokenEnvChange token = 227 // 0xE3
  26. tokenSSPI token = 237 // 0xED
  27. tokenDone token = 253 // 0xFD
  28. tokenDoneProc token = 254
  29. tokenDoneInProc token = 255
  30. )
  31. // done flags
  32. // https://msdn.microsoft.com/en-us/library/dd340421.aspx
  33. const (
  34. doneFinal = 0
  35. doneMore = 1
  36. doneError = 2
  37. doneInxact = 4
  38. doneCount = 0x10
  39. doneAttn = 0x20
  40. doneSrvError = 0x100
  41. )
  42. // ENVCHANGE types
  43. // http://msdn.microsoft.com/en-us/library/dd303449.aspx
  44. const (
  45. envTypDatabase = 1
  46. envTypLanguage = 2
  47. envTypCharset = 3
  48. envTypPacketSize = 4
  49. envSortId = 5
  50. envSortFlags = 6
  51. envSqlCollation = 7
  52. envTypBeginTran = 8
  53. envTypCommitTran = 9
  54. envTypRollbackTran = 10
  55. envEnlistDTC = 11
  56. envDefectTran = 12
  57. envDatabaseMirrorPartner = 13
  58. envPromoteTran = 15
  59. envTranMgrAddr = 16
  60. envTranEnded = 17
  61. envResetConnAck = 18
  62. envStartedInstanceName = 19
  63. envRouting = 20
  64. )
  65. // COLMETADATA flags
  66. // https://msdn.microsoft.com/en-us/library/dd357363.aspx
  67. const (
  68. colFlagNullable = 1
  69. // TODO implement more flags
  70. )
  71. // interface for all tokens
  72. type tokenStruct interface{}
  73. type orderStruct struct {
  74. ColIds []uint16
  75. }
  76. type doneStruct struct {
  77. Status uint16
  78. CurCmd uint16
  79. RowCount uint64
  80. errors []Error
  81. }
  82. func (d doneStruct) isError() bool {
  83. return d.Status&doneError != 0 || len(d.errors) > 0
  84. }
  85. func (d doneStruct) getError() Error {
  86. if len(d.errors) > 0 {
  87. return d.errors[len(d.errors)-1]
  88. } else {
  89. return Error{Message: "Request failed but didn't provide reason"}
  90. }
  91. }
  92. type doneInProcStruct doneStruct
  93. var doneFlags2str = map[uint16]string{
  94. doneFinal: "final",
  95. doneMore: "more",
  96. doneError: "error",
  97. doneInxact: "inxact",
  98. doneCount: "count",
  99. doneAttn: "attn",
  100. doneSrvError: "srverror",
  101. }
  102. func doneFlags2Str(flags uint16) string {
  103. strs := make([]string, 0, len(doneFlags2str))
  104. for flag, tag := range doneFlags2str {
  105. if flags&flag != 0 {
  106. strs = append(strs, tag)
  107. }
  108. }
  109. return strings.Join(strs, "|")
  110. }
  111. // ENVCHANGE stream
  112. // http://msdn.microsoft.com/en-us/library/dd303449.aspx
  113. func processEnvChg(sess *tdsSession) {
  114. size := sess.buf.uint16()
  115. r := &io.LimitedReader{R: sess.buf, N: int64(size)}
  116. for {
  117. var err error
  118. var envtype uint8
  119. err = binary.Read(r, binary.LittleEndian, &envtype)
  120. if err == io.EOF {
  121. return
  122. }
  123. if err != nil {
  124. badStreamPanic(err)
  125. }
  126. switch envtype {
  127. case envTypDatabase:
  128. sess.database, err = readBVarChar(r)
  129. if err != nil {
  130. badStreamPanic(err)
  131. }
  132. _, err = readBVarChar(r)
  133. if err != nil {
  134. badStreamPanic(err)
  135. }
  136. case envTypLanguage:
  137. // currently ignored
  138. // new value
  139. if _, err = readBVarChar(r); err != nil {
  140. badStreamPanic(err)
  141. }
  142. // old value
  143. if _, err = readBVarChar(r); err != nil {
  144. badStreamPanic(err)
  145. }
  146. case envTypCharset:
  147. // currently ignored
  148. // new value
  149. if _, err = readBVarChar(r); err != nil {
  150. badStreamPanic(err)
  151. }
  152. // old value
  153. if _, err = readBVarChar(r); err != nil {
  154. badStreamPanic(err)
  155. }
  156. case envTypPacketSize:
  157. packetsize, err := readBVarChar(r)
  158. if err != nil {
  159. badStreamPanic(err)
  160. }
  161. _, err = readBVarChar(r)
  162. if err != nil {
  163. badStreamPanic(err)
  164. }
  165. packetsizei, err := strconv.Atoi(packetsize)
  166. if err != nil {
  167. badStreamPanicf("Invalid Packet size value returned from server (%s): %s", packetsize, err.Error())
  168. }
  169. sess.buf.ResizeBuffer(packetsizei)
  170. case envSortId:
  171. // currently ignored
  172. // new value
  173. if _, err = readBVarChar(r); err != nil {
  174. badStreamPanic(err)
  175. }
  176. // old value, should be 0
  177. if _, err = readBVarChar(r); err != nil {
  178. badStreamPanic(err)
  179. }
  180. case envSortFlags:
  181. // currently ignored
  182. // new value
  183. if _, err = readBVarChar(r); err != nil {
  184. badStreamPanic(err)
  185. }
  186. // old value, should be 0
  187. if _, err = readBVarChar(r); err != nil {
  188. badStreamPanic(err)
  189. }
  190. case envSqlCollation:
  191. // currently ignored
  192. var collationSize uint8
  193. err = binary.Read(r, binary.LittleEndian, &collationSize)
  194. if err != nil {
  195. badStreamPanic(err)
  196. }
  197. // SQL Collation data should contain 5 bytes in length
  198. if collationSize != 5 {
  199. badStreamPanicf("Invalid SQL Collation size value returned from server: %s", collationSize)
  200. }
  201. // 4 bytes, contains: LCID ColFlags Version
  202. var info uint32
  203. err = binary.Read(r, binary.LittleEndian, &info)
  204. if err != nil {
  205. badStreamPanic(err)
  206. }
  207. // 1 byte, contains: sortID
  208. var sortID uint8
  209. err = binary.Read(r, binary.LittleEndian, &sortID)
  210. if err != nil {
  211. badStreamPanic(err)
  212. }
  213. // old value, should be 0
  214. if _, err = readBVarChar(r); err != nil {
  215. badStreamPanic(err)
  216. }
  217. case envTypBeginTran:
  218. tranid, err := readBVarByte(r)
  219. if len(tranid) != 8 {
  220. badStreamPanicf("invalid size of transaction identifier: %d", len(tranid))
  221. }
  222. sess.tranid = binary.LittleEndian.Uint64(tranid)
  223. if err != nil {
  224. badStreamPanic(err)
  225. }
  226. if sess.logFlags&logTransaction != 0 {
  227. sess.log.Printf("BEGIN TRANSACTION %x\n", sess.tranid)
  228. }
  229. _, err = readBVarByte(r)
  230. if err != nil {
  231. badStreamPanic(err)
  232. }
  233. case envTypCommitTran, envTypRollbackTran:
  234. _, err = readBVarByte(r)
  235. if err != nil {
  236. badStreamPanic(err)
  237. }
  238. _, err = readBVarByte(r)
  239. if err != nil {
  240. badStreamPanic(err)
  241. }
  242. if sess.logFlags&logTransaction != 0 {
  243. if envtype == envTypCommitTran {
  244. sess.log.Printf("COMMIT TRANSACTION %x\n", sess.tranid)
  245. } else {
  246. sess.log.Printf("ROLLBACK TRANSACTION %x\n", sess.tranid)
  247. }
  248. }
  249. sess.tranid = 0
  250. case envEnlistDTC:
  251. // currently ignored
  252. // new value, should be 0
  253. if _, err = readBVarChar(r); err != nil {
  254. badStreamPanic(err)
  255. }
  256. // old value
  257. if _, err = readBVarChar(r); err != nil {
  258. badStreamPanic(err)
  259. }
  260. case envDefectTran:
  261. // currently ignored
  262. // new value
  263. if _, err = readBVarChar(r); err != nil {
  264. badStreamPanic(err)
  265. }
  266. // old value, should be 0
  267. if _, err = readBVarChar(r); err != nil {
  268. badStreamPanic(err)
  269. }
  270. case envDatabaseMirrorPartner:
  271. sess.partner, err = readBVarChar(r)
  272. if err != nil {
  273. badStreamPanic(err)
  274. }
  275. _, err = readBVarChar(r)
  276. if err != nil {
  277. badStreamPanic(err)
  278. }
  279. case envPromoteTran:
  280. // currently ignored
  281. // old value, should be 0
  282. if _, err = readBVarChar(r); err != nil {
  283. badStreamPanic(err)
  284. }
  285. // dtc token
  286. // spec says it should be L_VARBYTE, so this code might be wrong
  287. if _, err = readBVarChar(r); err != nil {
  288. badStreamPanic(err)
  289. }
  290. case envTranMgrAddr:
  291. // currently ignored
  292. // old value, should be 0
  293. if _, err = readBVarChar(r); err != nil {
  294. badStreamPanic(err)
  295. }
  296. // XACT_MANAGER_ADDRESS = B_VARBYTE
  297. if _, err = readBVarChar(r); err != nil {
  298. badStreamPanic(err)
  299. }
  300. case envTranEnded:
  301. // currently ignored
  302. // old value, B_VARBYTE
  303. if _, err = readBVarChar(r); err != nil {
  304. badStreamPanic(err)
  305. }
  306. // should be 0
  307. if _, err = readBVarChar(r); err != nil {
  308. badStreamPanic(err)
  309. }
  310. case envResetConnAck:
  311. // currently ignored
  312. // old value, should be 0
  313. if _, err = readBVarChar(r); err != nil {
  314. badStreamPanic(err)
  315. }
  316. // should be 0
  317. if _, err = readBVarChar(r); err != nil {
  318. badStreamPanic(err)
  319. }
  320. case envStartedInstanceName:
  321. // currently ignored
  322. // old value, should be 0
  323. if _, err = readBVarChar(r); err != nil {
  324. badStreamPanic(err)
  325. }
  326. // instance name
  327. if _, err = readBVarChar(r); err != nil {
  328. badStreamPanic(err)
  329. }
  330. case envRouting:
  331. // RoutingData message is:
  332. // ValueLength USHORT
  333. // Protocol (TCP = 0) BYTE
  334. // ProtocolProperty (new port) USHORT
  335. // AlternateServer US_VARCHAR
  336. _, err := readUshort(r)
  337. if err != nil {
  338. badStreamPanic(err)
  339. }
  340. protocol, err := readByte(r)
  341. if err != nil || protocol != 0 {
  342. badStreamPanic(err)
  343. }
  344. newPort, err := readUshort(r)
  345. if err != nil {
  346. badStreamPanic(err)
  347. }
  348. newServer, err := readUsVarChar(r)
  349. if err != nil {
  350. badStreamPanic(err)
  351. }
  352. // consume the OLDVALUE = %x00 %x00
  353. _, err = readUshort(r)
  354. if err != nil {
  355. badStreamPanic(err)
  356. }
  357. sess.routedServer = newServer
  358. sess.routedPort = newPort
  359. default:
  360. // ignore rest of records because we don't know how to skip those
  361. sess.log.Printf("WARN: Unknown ENVCHANGE record detected with type id = %d\n", envtype)
  362. break
  363. }
  364. }
  365. }
  366. type returnStatus int32
  367. // http://msdn.microsoft.com/en-us/library/dd358180.aspx
  368. func parseReturnStatus(r *tdsBuffer) returnStatus {
  369. return returnStatus(r.int32())
  370. }
  371. func parseOrder(r *tdsBuffer) (res orderStruct) {
  372. len := int(r.uint16())
  373. res.ColIds = make([]uint16, len/2)
  374. for i := 0; i < len/2; i++ {
  375. res.ColIds[i] = r.uint16()
  376. }
  377. return res
  378. }
  379. // https://msdn.microsoft.com/en-us/library/dd340421.aspx
  380. func parseDone(r *tdsBuffer) (res doneStruct) {
  381. res.Status = r.uint16()
  382. res.CurCmd = r.uint16()
  383. res.RowCount = r.uint64()
  384. return res
  385. }
  386. // https://msdn.microsoft.com/en-us/library/dd340553.aspx
  387. func parseDoneInProc(r *tdsBuffer) (res doneInProcStruct) {
  388. res.Status = r.uint16()
  389. res.CurCmd = r.uint16()
  390. res.RowCount = r.uint64()
  391. return res
  392. }
  393. type sspiMsg []byte
  394. func parseSSPIMsg(r *tdsBuffer) sspiMsg {
  395. size := r.uint16()
  396. buf := make([]byte, size)
  397. r.ReadFull(buf)
  398. return sspiMsg(buf)
  399. }
  400. type loginAckStruct struct {
  401. Interface uint8
  402. TDSVersion uint32
  403. ProgName string
  404. ProgVer uint32
  405. }
  406. func parseLoginAck(r *tdsBuffer) loginAckStruct {
  407. size := r.uint16()
  408. buf := make([]byte, size)
  409. r.ReadFull(buf)
  410. var res loginAckStruct
  411. res.Interface = buf[0]
  412. res.TDSVersion = binary.BigEndian.Uint32(buf[1:])
  413. prognamelen := buf[1+4]
  414. var err error
  415. if res.ProgName, err = ucs22str(buf[1+4+1 : 1+4+1+prognamelen*2]); err != nil {
  416. badStreamPanic(err)
  417. }
  418. res.ProgVer = binary.BigEndian.Uint32(buf[size-4:])
  419. return res
  420. }
  421. // http://msdn.microsoft.com/en-us/library/dd357363.aspx
  422. func parseColMetadata72(r *tdsBuffer) (columns []columnStruct) {
  423. count := r.uint16()
  424. if count == 0xffff {
  425. // no metadata is sent
  426. return nil
  427. }
  428. columns = make([]columnStruct, count)
  429. for i := range columns {
  430. column := &columns[i]
  431. column.UserType = r.uint32()
  432. column.Flags = r.uint16()
  433. // parsing TYPE_INFO structure
  434. column.ti = readTypeInfo(r)
  435. column.ColName = r.BVarChar()
  436. }
  437. return columns
  438. }
  439. // http://msdn.microsoft.com/en-us/library/dd357254.aspx
  440. func parseRow(r *tdsBuffer, columns []columnStruct, row []interface{}) {
  441. for i, column := range columns {
  442. row[i] = column.ti.Reader(&column.ti, r)
  443. }
  444. }
  445. // http://msdn.microsoft.com/en-us/library/dd304783.aspx
  446. func parseNbcRow(r *tdsBuffer, columns []columnStruct, row []interface{}) {
  447. bitlen := (len(columns) + 7) / 8
  448. pres := make([]byte, bitlen)
  449. r.ReadFull(pres)
  450. for i, col := range columns {
  451. if pres[i/8]&(1<<(uint(i)%8)) != 0 {
  452. row[i] = nil
  453. continue
  454. }
  455. row[i] = col.ti.Reader(&col.ti, r)
  456. }
  457. }
  458. // http://msdn.microsoft.com/en-us/library/dd304156.aspx
  459. func parseError72(r *tdsBuffer) (res Error) {
  460. length := r.uint16()
  461. _ = length // ignore length
  462. res.Number = r.int32()
  463. res.State = r.byte()
  464. res.Class = r.byte()
  465. res.Message = r.UsVarChar()
  466. res.ServerName = r.BVarChar()
  467. res.ProcName = r.BVarChar()
  468. res.LineNo = r.int32()
  469. return
  470. }
  471. // http://msdn.microsoft.com/en-us/library/dd304156.aspx
  472. func parseInfo(r *tdsBuffer) (res Error) {
  473. length := r.uint16()
  474. _ = length // ignore length
  475. res.Number = r.int32()
  476. res.State = r.byte()
  477. res.Class = r.byte()
  478. res.Message = r.UsVarChar()
  479. res.ServerName = r.BVarChar()
  480. res.ProcName = r.BVarChar()
  481. res.LineNo = r.int32()
  482. return
  483. }
  484. // https://msdn.microsoft.com/en-us/library/dd303881.aspx
  485. func parseReturnValue(r *tdsBuffer) (nv namedValue) {
  486. /*
  487. ParamOrdinal
  488. ParamName
  489. Status
  490. UserType
  491. Flags
  492. TypeInfo
  493. CryptoMetadata
  494. Value
  495. */
  496. r.uint16()
  497. nv.Name = r.BVarChar()
  498. r.byte()
  499. r.uint32() // UserType (uint16 prior to 7.2)
  500. r.uint16()
  501. ti := readTypeInfo(r)
  502. nv.Value = ti.Reader(&ti, r)
  503. return
  504. }
  505. func processSingleResponse(sess *tdsSession, ch chan tokenStruct, outs map[string]interface{}) {
  506. defer func() {
  507. if err := recover(); err != nil {
  508. if sess.logFlags&logErrors != 0 {
  509. sess.log.Printf("ERROR: Intercepted panic %v", err)
  510. }
  511. ch <- err
  512. }
  513. close(ch)
  514. }()
  515. packet_type, err := sess.buf.BeginRead()
  516. if err != nil {
  517. if sess.logFlags&logErrors != 0 {
  518. sess.log.Printf("ERROR: BeginRead failed %v", err)
  519. }
  520. ch <- err
  521. return
  522. }
  523. if packet_type != packReply {
  524. badStreamPanic(fmt.Errorf("unexpected packet type in reply: got %v, expected %v", packet_type, packReply))
  525. }
  526. var columns []columnStruct
  527. errs := make([]Error, 0, 5)
  528. for {
  529. token := token(sess.buf.byte())
  530. if sess.logFlags&logDebug != 0 {
  531. sess.log.Printf("got token %v", token)
  532. }
  533. switch token {
  534. case tokenSSPI:
  535. ch <- parseSSPIMsg(sess.buf)
  536. return
  537. case tokenReturnStatus:
  538. returnStatus := parseReturnStatus(sess.buf)
  539. ch <- returnStatus
  540. case tokenLoginAck:
  541. loginAck := parseLoginAck(sess.buf)
  542. ch <- loginAck
  543. case tokenOrder:
  544. order := parseOrder(sess.buf)
  545. ch <- order
  546. case tokenDoneInProc:
  547. done := parseDoneInProc(sess.buf)
  548. if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 {
  549. sess.log.Printf("(%d row(s) affected)\n", done.RowCount)
  550. }
  551. ch <- done
  552. case tokenDone, tokenDoneProc:
  553. done := parseDone(sess.buf)
  554. done.errors = errs
  555. if sess.logFlags&logDebug != 0 {
  556. sess.log.Printf("got DONE or DONEPROC status=%d", done.Status)
  557. }
  558. if done.Status&doneSrvError != 0 {
  559. ch <- errors.New("SQL Server had internal error")
  560. return
  561. }
  562. if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 {
  563. sess.log.Printf("(%d row(s) affected)\n", done.RowCount)
  564. }
  565. ch <- done
  566. if done.Status&doneMore == 0 {
  567. return
  568. }
  569. case tokenColMetadata:
  570. columns = parseColMetadata72(sess.buf)
  571. ch <- columns
  572. case tokenRow:
  573. row := make([]interface{}, len(columns))
  574. parseRow(sess.buf, columns, row)
  575. ch <- row
  576. case tokenNbcRow:
  577. row := make([]interface{}, len(columns))
  578. parseNbcRow(sess.buf, columns, row)
  579. ch <- row
  580. case tokenEnvChange:
  581. processEnvChg(sess)
  582. case tokenError:
  583. err := parseError72(sess.buf)
  584. if sess.logFlags&logDebug != 0 {
  585. sess.log.Printf("got ERROR %d %s", err.Number, err.Message)
  586. }
  587. errs = append(errs, err)
  588. if sess.logFlags&logErrors != 0 {
  589. sess.log.Println(err.Message)
  590. }
  591. case tokenInfo:
  592. info := parseInfo(sess.buf)
  593. if sess.logFlags&logDebug != 0 {
  594. sess.log.Printf("got INFO %d %s", info.Number, info.Message)
  595. }
  596. if sess.logFlags&logMessages != 0 {
  597. sess.log.Println(info.Message)
  598. }
  599. case tokenReturnValue:
  600. nv := parseReturnValue(sess.buf)
  601. if len(nv.Name) > 0 {
  602. name := nv.Name[1:] // Remove the leading "@".
  603. if ov, has := outs[name]; has {
  604. err = scanIntoOut(name, nv.Value, ov)
  605. if err != nil {
  606. fmt.Println("scan error", err)
  607. ch <- err
  608. }
  609. }
  610. }
  611. default:
  612. badStreamPanic(fmt.Errorf("unknown token type returned: %v", token))
  613. }
  614. }
  615. }
  616. type parseRespIter byte
  617. const (
  618. parseRespIterContinue parseRespIter = iota // Continue parsing current token.
  619. parseRespIterNext // Fetch the next token.
  620. parseRespIterDone // Done with parsing the response.
  621. )
  622. type parseRespState byte
  623. const (
  624. parseRespStateNormal parseRespState = iota // Normal response state.
  625. parseRespStateCancel // Query is canceled, wait for server to confirm.
  626. parseRespStateClosing // Waiting for tokens to come through.
  627. )
  628. type parseResp struct {
  629. sess *tdsSession
  630. ctxDone <-chan struct{}
  631. state parseRespState
  632. cancelError error
  633. }
  634. func (ts *parseResp) sendAttention(ch chan tokenStruct) parseRespIter {
  635. if err := sendAttention(ts.sess.buf); err != nil {
  636. ts.dlogf("failed to send attention signal %v", err)
  637. ch <- err
  638. return parseRespIterDone
  639. }
  640. ts.state = parseRespStateCancel
  641. return parseRespIterContinue
  642. }
  643. func (ts *parseResp) dlog(msg string) {
  644. if ts.sess.logFlags&logDebug != 0 {
  645. ts.sess.log.Println(msg)
  646. }
  647. }
  648. func (ts *parseResp) dlogf(f string, v ...interface{}) {
  649. if ts.sess.logFlags&logDebug != 0 {
  650. ts.sess.log.Printf(f, v...)
  651. }
  652. }
  653. func (ts *parseResp) iter(ctx context.Context, ch chan tokenStruct, tokChan chan tokenStruct) parseRespIter {
  654. switch ts.state {
  655. default:
  656. panic("unknown state")
  657. case parseRespStateNormal:
  658. select {
  659. case tok, ok := <-tokChan:
  660. if !ok {
  661. ts.dlog("response finished")
  662. return parseRespIterDone
  663. }
  664. if err, ok := tok.(net.Error); ok && err.Timeout() {
  665. ts.cancelError = err
  666. ts.dlog("got timeout error, sending attention signal to server")
  667. return ts.sendAttention(ch)
  668. }
  669. // Pass the token along.
  670. ch <- tok
  671. return parseRespIterContinue
  672. case <-ts.ctxDone:
  673. ts.ctxDone = nil
  674. ts.dlog("got cancel message, sending attention signal to server")
  675. return ts.sendAttention(ch)
  676. }
  677. case parseRespStateCancel: // Read all responses until a DONE or error is received.Auth
  678. select {
  679. case tok, ok := <-tokChan:
  680. if !ok {
  681. ts.dlog("response finished but waiting for attention ack")
  682. return parseRespIterNext
  683. }
  684. switch tok := tok.(type) {
  685. default:
  686. // Ignore all other tokens while waiting.
  687. // The TDS spec says other tokens may arrive after an attention
  688. // signal is sent. Ignore these tokens and continue looking for
  689. // a DONE with attention confirm mark.
  690. case doneStruct:
  691. if tok.Status&doneAttn != 0 {
  692. ts.dlog("got cancellation confirmation from server")
  693. if ts.cancelError != nil {
  694. ch <- ts.cancelError
  695. ts.cancelError = nil
  696. } else {
  697. ch <- ctx.Err()
  698. }
  699. return parseRespIterDone
  700. }
  701. // If an error happens during cancel, pass it along and just stop.
  702. // We are uncertain to receive more tokens.
  703. case error:
  704. ch <- tok
  705. ts.state = parseRespStateClosing
  706. }
  707. return parseRespIterContinue
  708. case <-ts.ctxDone:
  709. ts.ctxDone = nil
  710. ts.state = parseRespStateClosing
  711. return parseRespIterContinue
  712. }
  713. case parseRespStateClosing: // Wait for current token chan to close.
  714. if _, ok := <-tokChan; !ok {
  715. ts.dlog("response finished")
  716. return parseRespIterDone
  717. }
  718. return parseRespIterContinue
  719. }
  720. }
  721. func processResponse(ctx context.Context, sess *tdsSession, ch chan tokenStruct, outs map[string]interface{}) {
  722. ts := &parseResp{
  723. sess: sess,
  724. ctxDone: ctx.Done(),
  725. }
  726. defer func() {
  727. // Ensure any remaining error is piped through
  728. // or the query may look like it executed when it actually failed.
  729. if ts.cancelError != nil {
  730. ch <- ts.cancelError
  731. ts.cancelError = nil
  732. }
  733. close(ch)
  734. }()
  735. // Loop over multiple responses.
  736. for {
  737. ts.dlog("initiating response reading")
  738. tokChan := make(chan tokenStruct)
  739. go processSingleResponse(sess, tokChan, outs)
  740. // Loop over multiple tokens in response.
  741. tokensLoop:
  742. for {
  743. switch ts.iter(ctx, ch, tokChan) {
  744. case parseRespIterContinue:
  745. // Nothing, continue to next token.
  746. case parseRespIterNext:
  747. break tokensLoop
  748. case parseRespIterDone:
  749. return
  750. }
  751. }
  752. }
  753. }