frame.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. )
  11. const (
  12. protoDirectionMask = 0x80
  13. protoVersionMask = 0x7F
  14. protoVersion1 = 0x01
  15. protoVersion2 = 0x02
  16. protoVersion3 = 0x03
  17. )
  18. const (
  19. // header ops
  20. opError byte = 0x00
  21. opStartup = 0x01
  22. opReady = 0x02
  23. opAuthenticate = 0x03
  24. opOptions = 0x05
  25. opSupported = 0x06
  26. opQuery = 0x07
  27. opResult = 0x08
  28. opPrepare = 0x09
  29. opExecute = 0x0A
  30. opRegister = 0x0B
  31. opEvent = 0x0C
  32. opBatch = 0x0D
  33. opAuthChallenge = 0x0E
  34. opAuthResponse = 0x0F
  35. opAuthSuccess = 0x10
  36. // result kind
  37. resultKindVoid = 1
  38. resultKindRows = 2
  39. resultKindKeyspace = 3
  40. resultKindPrepared = 4
  41. resultKindSchemaChanged = 5
  42. // rows flags
  43. flagGlobalTableSpec int = 0x01
  44. flagHasMorePages = 0x02
  45. flagNoMetaData = 0x04
  46. // query flags
  47. flagValues byte = 0x01
  48. flagSkipMetaData = 0x02
  49. flagPageSize = 0x04
  50. flagWithPagingState = 0x08
  51. flagWithSerialConsistency = 0x10
  52. // header flags
  53. flagCompression byte = 0x01
  54. flagTracing = 0x02
  55. flagQueryValues byte = 1
  56. flagCompress = 1
  57. flagTrace = 2
  58. flagPageState = 8
  59. flagHasMore = 2
  60. )
  61. const (
  62. apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
  63. )
  64. var headerProtoSize = [...]int{
  65. protoVersion1: 8,
  66. protoVersion2: 8,
  67. protoVersion3: 9,
  68. }
  69. func writeInt(p []byte, n int) {
  70. p[0] = byte(n >> 24)
  71. p[1] = byte(n >> 16)
  72. p[2] = byte(n >> 8)
  73. p[3] = byte(n)
  74. }
  75. func readInt(p []byte) int {
  76. if len(p) < 4 {
  77. panic("readInt requires 4 bytes")
  78. }
  79. return int(p[0])<<24 | int(p[1])<<16 | int(p[2])<<8 | int(p[3])
  80. }
  81. func writeShort(p []byte, n int) {
  82. p[0] = byte(n >> 8)
  83. p[1] = byte(n)
  84. }
  85. func readShort(p []byte) int16 {
  86. if len(p) < 2 {
  87. panic("readShort requires 2 bytes")
  88. }
  89. return int16(p[0])<<8 | int16(p[1])
  90. }
  91. type frameHeader interface {
  92. Version() byte
  93. Flags() byte
  94. Stream() int
  95. Op() byte
  96. Length() int
  97. HeaderSize() int
  98. io.Writer
  99. io.Reader
  100. }
  101. type frameHeaderV1 struct {
  102. version byte
  103. flags byte
  104. // stream is an int8 on the wire
  105. stream int
  106. op byte
  107. lenth int
  108. }
  109. func (f *frameHeaderV1) HeaderSize() int {
  110. return 8
  111. }
  112. func (f *frameHeaderV1) Header() frameHeader {
  113. return f
  114. }
  115. func (f *frameHeaderV1) appendWrite(p []byte) []byte {
  116. return append(p,
  117. f.version,
  118. f.flags,
  119. f.stream,
  120. f.op,
  121. byte(f.lenth>>24),
  122. byte(f.lenth>>16),
  123. byte(f.lenth>>8),
  124. byte(f.lenth),
  125. )
  126. }
  127. func (f *frameHeaderV1) Op() byte {
  128. return f.op()
  129. }
  130. func (f *frameHeaderV1) Flags() byte {
  131. return f.flags
  132. }
  133. func (f *frameHeaderV1) Stream() int {
  134. return f.stream
  135. }
  136. func (f *frameHeaderV1) Length() int {
  137. return f.lenth
  138. }
  139. type frameHeaderV3 struct {
  140. version byte
  141. flags byte
  142. // stream is an int16 on the wire
  143. stream int
  144. op byte
  145. lenth int
  146. }
  147. func (f *frameHeaderV3) HeaderSize() int {
  148. return 9
  149. }
  150. func (f *frameHeaderV3) Header() frameHeader {
  151. return f
  152. }
  153. func (f *frameHeaderV3) Read(p []byte) (int64, error) {
  154. if len(p) < 9 {
  155. return 0, errors.New("require 9 bytes to read v3 header")
  156. }
  157. f.version = b[0]
  158. f.flags = b[1]
  159. f.stream = int(readShort(b[2:]))
  160. f.op = b[4]
  161. f.lenth = readInt(b[5:])
  162. return 9, nil
  163. }
  164. func (f *frameHeaderV3) Op() byte {
  165. return f.op
  166. }
  167. func (f *frameHeaderV3) Flags() byte {
  168. return f.flags
  169. }
  170. func (f *frameHeaderV3) Stream() int {
  171. return f.stream
  172. }
  173. func (f *frameHeaderV3) Length() int {
  174. return f.lenth
  175. }
  176. func (f *frameHeaderV3) appendWrite(p []byte) []byte {
  177. return append(p,
  178. f.version,
  179. f.flags,
  180. byte(f.stream>>8),
  181. byte(f.stream),
  182. f.op,
  183. byte(f.lenth>>24),
  184. byte(f.lenth>>16),
  185. byte(f.lenth>>8),
  186. byte(f.lenth),
  187. )
  188. }
  189. type frame interface {
  190. Header() frameHeader
  191. }
  192. // a frame is responsible for reading and writing frames on a single stream for
  193. type framer struct {
  194. r io.Reader
  195. w io.Writer
  196. // the size which has been written or read of the body
  197. bodySize int
  198. proto byte
  199. buf []byte
  200. }
  201. func (f *framer) encodeFrame(frame frame) []byte {
  202. header := frame.Header()
  203. hsize := header.HeaderSize()
  204. // TODO: can we reuse an underlying buf slice here instead of allocating a new
  205. // one for writeFrame?
  206. if cap(f.buf) > hsize {
  207. // make sure there is enough room for the header
  208. f.buf = f.buf[0:hsize]
  209. }
  210. body := f.buf[hsize:]
  211. // write body
  212. body = frame.appendBody()
  213. // TODO: can this be done without a type switch and widthout having a SetSize()
  214. // method on the header?
  215. switch v := header.(type) {
  216. case *frameHeaderV1:
  217. v.lenth = len(body)
  218. case *frameHeaderV3:
  219. v.lenth = len(body)
  220. default:
  221. panic(fmt.Sprintf("encodeFrame: unknown header type: %T", v))
  222. }
  223. _, err := header.Write(f.buf[0:hsize])
  224. if err != nil {
  225. panic(err)
  226. }
  227. return f.buf[:]
  228. }
  229. // these are protocol level binary types
  230. func (f *framer) writeInt(n int) {
  231. f.buf = append(f.buf,
  232. byte(n>>24),
  233. byte(n>>16),
  234. byte(n>>8),
  235. byte(n),
  236. )
  237. }
  238. func (f *framer) writeShort(n int) {
  239. f.buf = append(f.buf,
  240. byte(n>>8),
  241. byte(n),
  242. )
  243. }
  244. func (f *framer) writeString(s string) {
  245. f.writeShort(len(s))
  246. f.buf = append(f.buf, s...)
  247. }
  248. func (f *framer) writeLongString(s string) {
  249. f.writeInt(len(s))
  250. f.buf = append(f.buf, s...)
  251. }
  252. func (f *framer) writeUUID(u *UUID) {
  253. f.buf = append(f.buf, u[:]...)
  254. }
  255. func (f *framer) writeStringList(l []string) {
  256. f.writeShort(len(l))
  257. for _, s := range l {
  258. f.writeString(s)
  259. }
  260. }
  261. func (f *framer) writeBytes(p []byte) {
  262. // TODO: handle null case correctly,
  263. // [bytes] A [int] n, followed by n bytes if n >= 0. If n < 0,
  264. // no byte should follow and the value represented is `null`.
  265. if p == nil {
  266. f.writeInt(-1)
  267. } else {
  268. f.writeInt(len(p))
  269. f.buf = append(f.buf, p...)
  270. }
  271. }
  272. func (f *framer) writeShortBytes(p []byte) {
  273. f.writeShort(len(p))
  274. f.fbuf = append(f.buf, p...)
  275. }
  276. // TODO: add writeOption, though no frame actually writes an option so probably
  277. // just need a read
  278. func (f *framer) writeInet(ip net.IP, port int) {
  279. f.buf = append(f.buf,
  280. byte(len(ip)),
  281. ip...,
  282. )
  283. f.writeInt(port)
  284. }
  285. func (f *framer) writeConsistency(cons Consistency) {
  286. f.writeShort(cons)
  287. }
  288. // TODO: replace with a struct which has a header and a body buffer,
  289. // header just has methods like, set/get the options in its backing array
  290. // then in a writeTo we write the header then the body.
  291. // type frame []byte
  292. func newFrame(version uint8) frame {
  293. // TODO: pool these at the session level incase anyone is using different
  294. // clusters with different versions in the same application.
  295. return make(frame, headerProtoSize[version], defaultFrameSize)
  296. }
  297. func (f *frame) writeInt(v int32) {
  298. p := f.grow(4)
  299. (*f)[p] = byte(v >> 24)
  300. (*f)[p+1] = byte(v >> 16)
  301. (*f)[p+2] = byte(v >> 8)
  302. (*f)[p+3] = byte(v)
  303. }
  304. func (f *frame) writeShort(v uint16) {
  305. p := f.grow(2)
  306. (*f)[p] = byte(v >> 8)
  307. (*f)[p+1] = byte(v)
  308. }
  309. func (f *frame) writeString(v string) {
  310. f.writeShort(uint16(len(v)))
  311. p := f.grow(len(v))
  312. copy((*f)[p:], v)
  313. }
  314. func (f *frame) writeLongString(v string) {
  315. f.writeInt(int32(len(v)))
  316. p := f.grow(len(v))
  317. copy((*f)[p:], v)
  318. }
  319. func (f *frame) writeUUID() {
  320. }
  321. func (f *frame) writeStringList(v []string) {
  322. f.writeShort(uint16(len(v)))
  323. for i := range v {
  324. f.writeString(v[i])
  325. }
  326. }
  327. func (f *frame) writeByte(v byte) {
  328. p := f.grow(1)
  329. (*f)[p] = v
  330. }
  331. func (f *frame) writeBytes(v []byte) {
  332. if v == nil {
  333. f.writeInt(-1)
  334. return
  335. }
  336. f.writeInt(int32(len(v)))
  337. p := f.grow(len(v))
  338. copy((*f)[p:], v)
  339. }
  340. func (f *frame) writeShortBytes(v []byte) {
  341. f.writeShort(uint16(len(v)))
  342. p := f.grow(len(v))
  343. copy((*f)[p:], v)
  344. }
  345. func (f *frame) writeInet(ip net.IP, port int) {
  346. p := f.grow(1 + len(ip))
  347. (*f)[p] = byte(len(ip))
  348. copy((*f)[p+1:], ip)
  349. f.writeInt(int32(port))
  350. }
  351. func (f *frame) writeStringMap(v map[string]string) {
  352. f.writeShort(uint16(len(v)))
  353. for key, value := range v {
  354. f.writeString(key)
  355. f.writeString(value)
  356. }
  357. }
  358. func (f *frame) writeStringMultimap(v map[string][]string) {
  359. f.writeShort(uint16(len(v)))
  360. for key, values := range v {
  361. f.writeString(key)
  362. f.writeStringList(values)
  363. }
  364. }
  365. func (f *frame) setHeader(version, flags uint8, stream int, opcode uint8) {
  366. (*f)[0] = version
  367. (*f)[1] = flags
  368. p := 2
  369. if version&maskVersion > protoVersion2 {
  370. (*f)[2] = byte(stream >> 8)
  371. (*f)[3] = byte(stream)
  372. p += 2
  373. } else {
  374. (*f)[2] = byte(stream & 0xFF)
  375. p++
  376. }
  377. (*f)[p] = opcode
  378. }
  379. func (f *frame) setStream(stream int, version uint8) {
  380. if version > protoVersion2 {
  381. (*f)[2] = byte(stream >> 8)
  382. (*f)[3] = byte(stream)
  383. } else {
  384. (*f)[2] = byte(stream)
  385. }
  386. }
  387. func (f *frame) Stream(version uint8) (n int) {
  388. if version > protoVersion2 {
  389. n = int((*f)[2])<<8 | int((*f)[3])
  390. } else {
  391. n = int((*f)[2])
  392. }
  393. return
  394. }
  395. func (f *frame) setLength(length int, version uint8) {
  396. p := 4
  397. if version > protoVersion2 {
  398. p = 5
  399. }
  400. (*f)[p] = byte(length >> 24)
  401. (*f)[p+1] = byte(length >> 16)
  402. (*f)[p+2] = byte(length >> 8)
  403. (*f)[p+3] = byte(length)
  404. }
  405. func (f *frame) Op(version uint8) byte {
  406. if version > protoVersion2 {
  407. return (*f)[4]
  408. } else {
  409. return (*f)[3]
  410. }
  411. }
  412. func (f *frame) Length(version uint8) int {
  413. p := 4
  414. if version > protoVersion2 {
  415. p = 5
  416. }
  417. return int((*f)[p])<<24 | int((*f)[p+1])<<16 | int((*f)[p+2])<<8 | int((*f)[p+3])
  418. }
  419. func (f *frame) grow(n int) int {
  420. if len(*f)+n >= cap(*f) {
  421. buf := make(frame, len(*f), len(*f)*2+n)
  422. copy(buf, *f)
  423. *f = buf
  424. }
  425. p := len(*f)
  426. *f = (*f)[:p+n]
  427. return p
  428. }
  429. func (f *frame) skipHeader(version uint8) {
  430. *f = (*f)[headerProtoSize[version]:]
  431. }
  432. func (f *frame) readInt() int {
  433. if len(*f) < 4 {
  434. panic(NewErrProtocol("Trying to read an int while <4 bytes in the buffer"))
  435. }
  436. v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
  437. *f = (*f)[4:]
  438. return int(int32(v))
  439. }
  440. func (f *frame) readShort() uint16 {
  441. if len(*f) < 2 {
  442. panic(NewErrProtocol("Trying to read a short while <2 bytes in the buffer"))
  443. }
  444. v := uint16((*f)[0])<<8 | uint16((*f)[1])
  445. *f = (*f)[2:]
  446. return v
  447. }
  448. func (f *frame) readString() string {
  449. n := int(f.readShort())
  450. if len(*f) < n {
  451. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  452. }
  453. v := string((*f)[:n])
  454. *f = (*f)[n:]
  455. return v
  456. }
  457. func (f *frame) readLongString() string {
  458. n := f.readInt()
  459. if len(*f) < n {
  460. panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
  461. }
  462. v := string((*f)[:n])
  463. *f = (*f)[n:]
  464. return v
  465. }
  466. func (f *frame) readBytes() []byte {
  467. n := f.readInt()
  468. if n < 0 {
  469. return nil
  470. }
  471. if len(*f) < n {
  472. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  473. }
  474. v := (*f)[:n]
  475. *f = (*f)[n:]
  476. return v
  477. }
  478. func (f *frame) readShortBytes() []byte {
  479. n := int(f.readShort())
  480. if len(*f) < n {
  481. panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
  482. }
  483. v := (*f)[:n]
  484. *f = (*f)[n:]
  485. return v
  486. }
  487. func (f *frame) readTypeInfo(version uint8) *TypeInfo {
  488. x := f.readShort()
  489. typ := &TypeInfo{
  490. Proto: version,
  491. Type: Type(x),
  492. }
  493. switch typ.Type {
  494. case TypeCustom:
  495. typ.Custom = f.readString()
  496. if cassType := getApacheCassandraType(typ.Custom); cassType != TypeCustom {
  497. typ = &TypeInfo{Type: cassType}
  498. switch typ.Type {
  499. case TypeMap:
  500. typ.Key = f.readTypeInfo(version)
  501. fallthrough
  502. case TypeList, TypeSet:
  503. typ.Elem = f.readTypeInfo(version)
  504. }
  505. }
  506. case TypeMap:
  507. typ.Key = f.readTypeInfo(version)
  508. fallthrough
  509. case TypeList, TypeSet:
  510. typ.Elem = f.readTypeInfo(version)
  511. }
  512. return typ
  513. }
  514. func (f *frame) readMetaData(version uint8) ([]ColumnInfo, []byte) {
  515. flags := f.readInt()
  516. numColumns := f.readInt()
  517. var pageState []byte
  518. if flags&2 != 0 {
  519. pageState = f.readBytes()
  520. }
  521. globalKeyspace := ""
  522. globalTable := ""
  523. if flags&1 != 0 {
  524. globalKeyspace = f.readString()
  525. globalTable = f.readString()
  526. }
  527. columns := make([]ColumnInfo, numColumns)
  528. for i := 0; i < numColumns; i++ {
  529. columns[i].Keyspace = globalKeyspace
  530. columns[i].Table = globalTable
  531. if flags&1 == 0 {
  532. columns[i].Keyspace = f.readString()
  533. columns[i].Table = f.readString()
  534. }
  535. columns[i].Name = f.readString()
  536. columns[i].TypeInfo = f.readTypeInfo(version)
  537. }
  538. return columns, pageState
  539. }
  540. func (f *frame) readError() RequestError {
  541. code := f.readInt()
  542. msg := f.readString()
  543. errD := errorFrame{code, msg}
  544. switch code {
  545. case errUnavailable:
  546. cl := Consistency(f.readShort())
  547. required := f.readInt()
  548. alive := f.readInt()
  549. return RequestErrUnavailable{errorFrame: errD,
  550. Consistency: cl,
  551. Required: required,
  552. Alive: alive}
  553. case errWriteTimeout:
  554. cl := Consistency(f.readShort())
  555. received := f.readInt()
  556. blockfor := f.readInt()
  557. writeType := f.readString()
  558. return RequestErrWriteTimeout{errorFrame: errD,
  559. Consistency: cl,
  560. Received: received,
  561. BlockFor: blockfor,
  562. WriteType: writeType,
  563. }
  564. case errReadTimeout:
  565. cl := Consistency(f.readShort())
  566. received := f.readInt()
  567. blockfor := f.readInt()
  568. dataPresent := (*f)[0]
  569. *f = (*f)[1:]
  570. return RequestErrReadTimeout{errorFrame: errD,
  571. Consistency: cl,
  572. Received: received,
  573. BlockFor: blockfor,
  574. DataPresent: dataPresent,
  575. }
  576. case errAlreadyExists:
  577. ks := f.readString()
  578. table := f.readString()
  579. return RequestErrAlreadyExists{errorFrame: errD,
  580. Keyspace: ks,
  581. Table: table,
  582. }
  583. case errUnprepared:
  584. stmtId := f.readShortBytes()
  585. return RequestErrUnprepared{errorFrame: errD,
  586. StatementId: stmtId,
  587. }
  588. default:
  589. return errD
  590. }
  591. }
  592. func (f *frame) writeConsistency(c Consistency) {
  593. f.writeShort(consistencyCodes[c])
  594. }
  595. func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
  596. return f, nil
  597. }
  598. var consistencyCodes = []uint16{
  599. Any: 0x0000,
  600. One: 0x0001,
  601. Two: 0x0002,
  602. Three: 0x0003,
  603. Quorum: 0x0004,
  604. All: 0x0005,
  605. LocalQuorum: 0x0006,
  606. EachQuorum: 0x0007,
  607. Serial: 0x0008,
  608. LocalSerial: 0x0009,
  609. LocalOne: 0x000A,
  610. }
  611. type readyFrame struct{}
  612. type supportedFrame struct{}
  613. type resultVoidFrame struct{}
  614. type resultRowsFrame struct {
  615. Columns []ColumnInfo
  616. Rows [][][]byte
  617. PagingState []byte
  618. }
  619. type resultKeyspaceFrame struct {
  620. Keyspace string
  621. }
  622. type resultPreparedFrame struct {
  623. PreparedId []byte
  624. Arguments []ColumnInfo
  625. ReturnValues []ColumnInfo
  626. }
  627. type operation interface {
  628. encodeFrame(version uint8, dst frame) (frame, error)
  629. }
  630. type startupFrame struct {
  631. CQLVersion string
  632. Compression string
  633. }
  634. func (op *startupFrame) String() string {
  635. return fmt.Sprintf("[startup cqlversion=%q compression=%q]", op.CQLVersion, op.Compression)
  636. }
  637. func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
  638. if f == nil {
  639. f = newFrame(version)
  640. }
  641. f.setHeader(version, 0, 0, opStartup)
  642. // TODO: fix this, this is actually a StringMap
  643. var size uint16 = 1
  644. if op.Compression != "" {
  645. size++
  646. }
  647. f.writeShort(size)
  648. f.writeString("CQL_VERSION")
  649. f.writeString(op.CQLVersion)
  650. if op.Compression != "" {
  651. f.writeString("COMPRESSION")
  652. f.writeString(op.Compression)
  653. }
  654. return f, nil
  655. }
  656. type queryFrame struct {
  657. Stmt string
  658. Prepared []byte
  659. Cons Consistency
  660. Values [][]byte
  661. PageSize int
  662. PageState []byte
  663. }
  664. func (op *queryFrame) String() string {
  665. return fmt.Sprintf("[query statement=%q prepared=%x cons=%v ...]", op.Stmt, op.Prepared, op.Cons)
  666. }
  667. func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
  668. if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
  669. (len(op.Values) > 0 && len(op.Prepared) == 0)) {
  670. return nil, ErrUnsupported
  671. }
  672. if f == nil {
  673. f = newFrame(version)
  674. }
  675. if len(op.Prepared) > 0 {
  676. f.setHeader(version, 0, 0, opExecute)
  677. f.writeShortBytes(op.Prepared)
  678. } else {
  679. f.setHeader(version, 0, 0, opQuery)
  680. f.writeLongString(op.Stmt)
  681. }
  682. if version >= 2 {
  683. f.writeConsistency(op.Cons)
  684. flagPos := len(f)
  685. f.writeByte(0)
  686. if len(op.Values) > 0 {
  687. f[flagPos] |= flagQueryValues
  688. f.writeShort(uint16(len(op.Values)))
  689. for _, value := range op.Values {
  690. f.writeBytes(value)
  691. }
  692. }
  693. if op.PageSize > 0 {
  694. f[flagPos] |= flagPageSize
  695. f.writeInt(int32(op.PageSize))
  696. }
  697. if len(op.PageState) > 0 {
  698. f[flagPos] |= flagPageState
  699. f.writeBytes(op.PageState)
  700. }
  701. } else if version == 1 {
  702. if len(op.Prepared) > 0 {
  703. f.writeShort(uint16(len(op.Values)))
  704. for _, value := range op.Values {
  705. f.writeBytes(value)
  706. }
  707. }
  708. f.writeConsistency(op.Cons)
  709. }
  710. return f, nil
  711. }
  712. type prepareFrame struct {
  713. Stmt string
  714. }
  715. func (op *prepareFrame) String() string {
  716. return fmt.Sprintf("[prepare statement=%q]", op.Stmt)
  717. }
  718. func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
  719. if f == nil {
  720. f = newFrame(version)
  721. }
  722. f.setHeader(version, 0, 0, opPrepare)
  723. f.writeLongString(op.Stmt)
  724. return f, nil
  725. }
  726. type optionsFrame struct{}
  727. func (op *optionsFrame) String() string {
  728. return "[options]"
  729. }
  730. func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
  731. if f == nil {
  732. f = newFrame(version)
  733. }
  734. f.setHeader(version, 0, 0, opOptions)
  735. return f, nil
  736. }
  737. type authenticateFrame struct {
  738. Authenticator string
  739. }
  740. func (op *authenticateFrame) String() string {
  741. return fmt.Sprintf("[authenticate authenticator=%q]", op.Authenticator)
  742. }
  743. type authResponseFrame struct {
  744. Data []byte
  745. }
  746. func (op *authResponseFrame) String() string {
  747. return fmt.Sprintf("[auth_response data=%q]", op.Data)
  748. }
  749. func (op *authResponseFrame) encodeFrame(version uint8, f frame) (frame, error) {
  750. if f == nil {
  751. f = newFrame(version)
  752. }
  753. f.setHeader(version, 0, 0, opAuthResponse)
  754. f.writeBytes(op.Data)
  755. return f, nil
  756. }
  757. type authSuccessFrame struct {
  758. Data []byte
  759. }
  760. func (op *authSuccessFrame) String() string {
  761. return fmt.Sprintf("[auth_success data=%q]", op.Data)
  762. }
  763. type authChallengeFrame struct {
  764. Data []byte
  765. }
  766. func (op *authChallengeFrame) String() string {
  767. return fmt.Sprintf("[auth_challenge data=%q]", op.Data)
  768. }