conn.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "crypto/tls"
  8. "crypto/x509"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. const defaultFrameSize = 4096
  18. const flagResponse = 0x80
  19. const maskVersion = 0x7F
  20. type Authenticator interface {
  21. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  22. Success(data []byte) error
  23. }
  24. type PasswordAuthenticator struct {
  25. Username string
  26. Password string
  27. }
  28. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  29. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  30. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  31. }
  32. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  33. resp[0] = 0
  34. copy(resp[1:], p.Username)
  35. resp[len(p.Username)+1] = 0
  36. copy(resp[2+len(p.Username):], p.Password)
  37. return resp, nil, nil
  38. }
  39. func (p PasswordAuthenticator) Success(data []byte) error {
  40. return nil
  41. }
  42. type SslOptions struct {
  43. CertPath string
  44. KeyPath string
  45. CaPath string //optional depending on server config
  46. // If you want to verify the hostname and server cert (like a wildcard for cass cluster) then you should turn this on
  47. // This option is basically the inverse of InSecureSkipVerify
  48. // See InSecureSkipVerify in http://golang.org/pkg/crypto/tls/ for more info
  49. EnableHostVerification bool
  50. }
  51. type ConnConfig struct {
  52. ProtoVersion int
  53. CQLVersion string
  54. Timeout time.Duration
  55. NumStreams int
  56. Compressor Compressor
  57. Authenticator Authenticator
  58. Keepalive time.Duration
  59. SslOpts *SslOptions
  60. }
  61. // Conn is a single connection to a Cassandra node. It can be used to execute
  62. // queries, but users are usually advised to use a more reliable, higher
  63. // level API.
  64. type Conn struct {
  65. conn net.Conn
  66. r *bufio.Reader
  67. timeout time.Duration
  68. uniq chan uint8
  69. calls []callReq
  70. nwait int32
  71. pool ConnectionPool
  72. compressor Compressor
  73. auth Authenticator
  74. addr string
  75. version uint8
  76. currentKeyspace string
  77. closedMu sync.RWMutex
  78. isClosed bool
  79. }
  80. // Connect establishes a connection to a Cassandra node.
  81. // You must also call the Serve method before you can execute any queries.
  82. func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
  83. var (
  84. err error
  85. conn net.Conn
  86. )
  87. if cfg.SslOpts != nil {
  88. certPool := x509.NewCertPool()
  89. //ca cert is optional
  90. if cfg.SslOpts.CaPath != "" {
  91. pem, err := ioutil.ReadFile(cfg.SslOpts.CaPath)
  92. if err != nil {
  93. return nil, err
  94. }
  95. if !certPool.AppendCertsFromPEM(pem) {
  96. return nil, errors.New("Failed parsing or appending certs")
  97. }
  98. }
  99. mycert, err := tls.LoadX509KeyPair(cfg.SslOpts.CertPath, cfg.SslOpts.KeyPath)
  100. if err != nil {
  101. return nil, err
  102. }
  103. config := tls.Config{
  104. Certificates: []tls.Certificate{mycert},
  105. RootCAs: certPool,
  106. }
  107. config.InsecureSkipVerify = !cfg.SslOpts.EnableHostVerification
  108. if conn, err = tls.Dial("tcp", addr, &config); err != nil {
  109. return nil, err
  110. }
  111. } else if conn, err = net.DialTimeout("tcp", addr, cfg.Timeout); err != nil {
  112. return nil, err
  113. }
  114. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  115. cfg.NumStreams = 128
  116. }
  117. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  118. cfg.ProtoVersion = 2
  119. }
  120. c := &Conn{
  121. conn: conn,
  122. r: bufio.NewReader(conn),
  123. uniq: make(chan uint8, cfg.NumStreams),
  124. calls: make([]callReq, cfg.NumStreams),
  125. timeout: cfg.Timeout,
  126. version: uint8(cfg.ProtoVersion),
  127. addr: conn.RemoteAddr().String(),
  128. pool: pool,
  129. compressor: cfg.Compressor,
  130. auth: cfg.Authenticator,
  131. }
  132. if cfg.Keepalive > 0 {
  133. c.setKeepalive(cfg.Keepalive)
  134. }
  135. for i := 0; i < cap(c.uniq); i++ {
  136. c.uniq <- uint8(i)
  137. }
  138. if err := c.startup(&cfg); err != nil {
  139. conn.Close()
  140. return nil, err
  141. }
  142. go c.serve()
  143. return c, nil
  144. }
  145. func (c *Conn) startup(cfg *ConnConfig) error {
  146. compression := ""
  147. if c.compressor != nil {
  148. compression = c.compressor.Name()
  149. }
  150. var req operation = &startupFrame{
  151. CQLVersion: cfg.CQLVersion,
  152. Compression: compression,
  153. }
  154. var challenger Authenticator
  155. for {
  156. resp, err := c.execSimple(req)
  157. if err != nil {
  158. return err
  159. }
  160. switch x := resp.(type) {
  161. case readyFrame:
  162. return nil
  163. case error:
  164. return x
  165. case authenticateFrame:
  166. if c.auth == nil {
  167. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  168. }
  169. var resp []byte
  170. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  171. if err != nil {
  172. return err
  173. }
  174. req = &authResponseFrame{resp}
  175. case authChallengeFrame:
  176. if challenger == nil {
  177. return fmt.Errorf("authentication error (invalid challenge)")
  178. }
  179. var resp []byte
  180. resp, challenger, err = challenger.Challenge(x.Data)
  181. if err != nil {
  182. return err
  183. }
  184. req = &authResponseFrame{resp}
  185. case authSuccessFrame:
  186. if challenger != nil {
  187. return challenger.Success(x.Data)
  188. }
  189. return nil
  190. default:
  191. return NewErrProtocol("Unknown type of response to startup frame: %s", x)
  192. }
  193. }
  194. }
  195. // Serve starts the stream multiplexer for this connection, which is required
  196. // to execute any queries. This method runs as long as the connection is
  197. // open and is therefore usually called in a separate goroutine.
  198. func (c *Conn) serve() {
  199. var (
  200. err error
  201. resp frame
  202. )
  203. for {
  204. resp, err = c.recv()
  205. if err != nil {
  206. break
  207. }
  208. c.dispatch(resp)
  209. }
  210. c.Close()
  211. for id := 0; id < len(c.calls); id++ {
  212. req := &c.calls[id]
  213. if atomic.LoadInt32(&req.active) == 1 {
  214. req.resp <- callResp{nil, err}
  215. }
  216. }
  217. c.pool.HandleError(c, err, true)
  218. }
  219. func (c *Conn) recv() (frame, error) {
  220. resp := make(frame, headerSize, headerSize+512)
  221. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  222. n, last, pinged := 0, 0, false
  223. for n < len(resp) {
  224. nn, err := c.r.Read(resp[n:])
  225. n += nn
  226. if err != nil {
  227. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  228. if n > last {
  229. // we hit the deadline but we made progress.
  230. // simply extend the deadline
  231. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  232. last = n
  233. } else if n == 0 && !pinged {
  234. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  235. if atomic.LoadInt32(&c.nwait) > 0 {
  236. go c.ping()
  237. pinged = true
  238. }
  239. } else {
  240. return nil, err
  241. }
  242. } else {
  243. return nil, err
  244. }
  245. }
  246. if n == headerSize && len(resp) == headerSize {
  247. if resp[0] != c.version|flagResponse {
  248. return nil, NewErrProtocol("recv: Response protocol version does not match connection protocol version (%d != %d)", resp[0], c.version|flagResponse)
  249. }
  250. resp.grow(resp.Length())
  251. }
  252. }
  253. return resp, nil
  254. }
  255. func (c *Conn) execSimple(op operation) (interface{}, error) {
  256. f, err := op.encodeFrame(c.version, nil)
  257. f.setLength(len(f) - headerSize)
  258. if _, err := c.conn.Write([]byte(f)); err != nil {
  259. c.Close()
  260. return nil, err
  261. }
  262. if f, err = c.recv(); err != nil {
  263. return nil, err
  264. }
  265. return c.decodeFrame(f, nil)
  266. }
  267. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  268. req, err := op.encodeFrame(c.version, nil)
  269. if err != nil {
  270. return nil, err
  271. }
  272. if trace != nil {
  273. req[1] |= flagTrace
  274. }
  275. if len(req) > headerSize && c.compressor != nil {
  276. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  277. if err != nil {
  278. return nil, err
  279. }
  280. req = append(req[:headerSize], frame(body)...)
  281. req[1] |= flagCompress
  282. }
  283. req.setLength(len(req) - headerSize)
  284. id := <-c.uniq
  285. req[2] = id
  286. call := &c.calls[id]
  287. call.resp = make(chan callResp, 1)
  288. atomic.AddInt32(&c.nwait, 1)
  289. atomic.StoreInt32(&call.active, 1)
  290. if _, err := c.conn.Write(req); err != nil {
  291. c.uniq <- id
  292. c.Close()
  293. return nil, err
  294. }
  295. reply := <-call.resp
  296. call.resp = nil
  297. c.uniq <- id
  298. if reply.err != nil {
  299. return nil, reply.err
  300. }
  301. return c.decodeFrame(reply.buf, trace)
  302. }
  303. func (c *Conn) dispatch(resp frame) {
  304. id := int(resp[2])
  305. if id >= len(c.calls) {
  306. return
  307. }
  308. call := &c.calls[id]
  309. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  310. return
  311. }
  312. atomic.AddInt32(&c.nwait, -1)
  313. call.resp <- callResp{resp, nil}
  314. }
  315. func (c *Conn) ping() error {
  316. _, err := c.exec(&optionsFrame{}, nil)
  317. return err
  318. }
  319. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
  320. stmtsLRU.mu.Lock()
  321. stmtCacheKey := c.addr + c.currentKeyspace + stmt
  322. if val, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  323. flight := val.(*inflightPrepare)
  324. stmtsLRU.mu.Unlock()
  325. flight.wg.Wait()
  326. return flight.info, flight.err
  327. }
  328. flight := new(inflightPrepare)
  329. flight.wg.Add(1)
  330. stmtsLRU.lru.Add(stmtCacheKey, flight)
  331. stmtsLRU.mu.Unlock()
  332. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  333. if err != nil {
  334. flight.err = err
  335. } else {
  336. switch x := resp.(type) {
  337. case resultPreparedFrame:
  338. flight.info = &QueryInfo{
  339. Id: x.PreparedId,
  340. Args: x.Arguments,
  341. Rval: x.ReturnValues,
  342. }
  343. case error:
  344. flight.err = x
  345. default:
  346. flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
  347. }
  348. err = flight.err
  349. }
  350. flight.wg.Done()
  351. if err != nil {
  352. stmtsLRU.mu.Lock()
  353. stmtsLRU.lru.Remove(stmtCacheKey)
  354. stmtsLRU.mu.Unlock()
  355. }
  356. return flight.info, flight.err
  357. }
  358. func (c *Conn) executeQuery(qry *Query) *Iter {
  359. op := &queryFrame{
  360. Stmt: qry.stmt,
  361. Cons: qry.cons,
  362. PageSize: qry.pageSize,
  363. PageState: qry.pageState,
  364. }
  365. if qry.shouldPrepare() {
  366. // Prepare all DML queries. Other queries can not be prepared.
  367. info, err := c.prepareStatement(qry.stmt, qry.trace)
  368. if err != nil {
  369. return &Iter{err: err}
  370. }
  371. var values []interface{}
  372. if qry.binding == nil {
  373. values = qry.values
  374. } else {
  375. values, err = qry.binding(info)
  376. if err != nil {
  377. return &Iter{err: err}
  378. }
  379. }
  380. if len(values) != len(info.Args) {
  381. return &Iter{err: ErrQueryArgLength}
  382. }
  383. op.Prepared = info.Id
  384. op.Values = make([][]byte, len(values))
  385. for i := 0; i < len(values); i++ {
  386. val, err := Marshal(info.Args[i].TypeInfo, values[i])
  387. if err != nil {
  388. return &Iter{err: err}
  389. }
  390. op.Values[i] = val
  391. }
  392. }
  393. resp, err := c.exec(op, qry.trace)
  394. if err != nil {
  395. return &Iter{err: err}
  396. }
  397. switch x := resp.(type) {
  398. case resultVoidFrame:
  399. return &Iter{}
  400. case resultRowsFrame:
  401. iter := &Iter{columns: x.Columns, rows: x.Rows}
  402. if len(x.PagingState) > 0 {
  403. iter.next = &nextIter{
  404. qry: *qry,
  405. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  406. }
  407. iter.next.qry.pageState = x.PagingState
  408. if iter.next.pos < 1 {
  409. iter.next.pos = 1
  410. }
  411. }
  412. return iter
  413. case resultKeyspaceFrame:
  414. return &Iter{}
  415. case RequestErrUnprepared:
  416. stmtsLRU.mu.Lock()
  417. stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
  418. if _, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  419. stmtsLRU.lru.Remove(stmtCacheKey)
  420. stmtsLRU.mu.Unlock()
  421. return c.executeQuery(qry)
  422. }
  423. stmtsLRU.mu.Unlock()
  424. return &Iter{err: x}
  425. case error:
  426. return &Iter{err: x}
  427. default:
  428. return &Iter{err: NewErrProtocol("Unknown type in response to execute query: %s", x)}
  429. }
  430. }
  431. func (c *Conn) Pick(qry *Query) *Conn {
  432. if c.Closed() {
  433. return nil
  434. }
  435. return c
  436. }
  437. func (c *Conn) Closed() bool {
  438. c.closedMu.RLock()
  439. closed := c.isClosed
  440. c.closedMu.RUnlock()
  441. return closed
  442. }
  443. func (c *Conn) Close() {
  444. c.closedMu.Lock()
  445. if c.isClosed {
  446. c.closedMu.Unlock()
  447. return
  448. }
  449. c.isClosed = true
  450. c.closedMu.Unlock()
  451. c.conn.Close()
  452. }
  453. func (c *Conn) Address() string {
  454. return c.addr
  455. }
  456. func (c *Conn) AvailableStreams() int {
  457. return len(c.uniq)
  458. }
  459. func (c *Conn) UseKeyspace(keyspace string) error {
  460. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  461. if err != nil {
  462. return err
  463. }
  464. switch x := resp.(type) {
  465. case resultKeyspaceFrame:
  466. case error:
  467. return x
  468. default:
  469. return NewErrProtocol("Unknown type in response to USE: %s", x)
  470. }
  471. c.currentKeyspace = keyspace
  472. return nil
  473. }
  474. func (c *Conn) executeBatch(batch *Batch) error {
  475. if c.version == 1 {
  476. return ErrUnsupported
  477. }
  478. f := make(frame, headerSize, defaultFrameSize)
  479. f.setHeader(c.version, 0, 0, opBatch)
  480. f.writeByte(byte(batch.Type))
  481. f.writeShort(uint16(len(batch.Entries)))
  482. stmts := make(map[string]string)
  483. for i := 0; i < len(batch.Entries); i++ {
  484. entry := &batch.Entries[i]
  485. var info *QueryInfo
  486. var args []interface{}
  487. if len(entry.Args) > 0 || entry.binding != nil {
  488. var err error
  489. info, err = c.prepareStatement(entry.Stmt, nil)
  490. if err != nil {
  491. return err
  492. }
  493. if entry.binding == nil {
  494. args = entry.Args
  495. } else {
  496. args, err = entry.binding(info)
  497. if err != nil {
  498. return err
  499. }
  500. }
  501. if len(args) != len(info.Args) {
  502. return ErrQueryArgLength
  503. }
  504. stmts[string(info.Id)] = entry.Stmt
  505. f.writeByte(1)
  506. f.writeShortBytes(info.Id)
  507. } else {
  508. f.writeByte(0)
  509. f.writeLongString(entry.Stmt)
  510. }
  511. f.writeShort(uint16(len(args)))
  512. for j := 0; j < len(args); j++ {
  513. val, err := Marshal(info.Args[j].TypeInfo, args[j])
  514. if err != nil {
  515. return err
  516. }
  517. f.writeBytes(val)
  518. }
  519. }
  520. f.writeConsistency(batch.Cons)
  521. resp, err := c.exec(f, nil)
  522. if err != nil {
  523. return err
  524. }
  525. switch x := resp.(type) {
  526. case resultVoidFrame:
  527. return nil
  528. case RequestErrUnprepared:
  529. stmt, found := stmts[string(x.StatementId)]
  530. if found {
  531. stmtsLRU.mu.Lock()
  532. stmtsLRU.lru.Remove(c.addr + c.currentKeyspace + stmt)
  533. stmtsLRU.mu.Unlock()
  534. }
  535. if found {
  536. return c.executeBatch(batch)
  537. } else {
  538. return x
  539. }
  540. case error:
  541. return x
  542. default:
  543. return NewErrProtocol("Unknown type in response to batch statement: %s", x)
  544. }
  545. }
  546. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  547. defer func() {
  548. if r := recover(); r != nil {
  549. if e, ok := r.(ErrProtocol); ok {
  550. err = e
  551. return
  552. }
  553. panic(r)
  554. }
  555. }()
  556. if len(f) < headerSize {
  557. return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
  558. } else if f[0] != c.version|flagResponse {
  559. return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
  560. }
  561. flags, op, f := f[1], f[3], f[headerSize:]
  562. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  563. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  564. return nil, err
  565. } else {
  566. f = frame(buf)
  567. }
  568. }
  569. if flags&flagTrace != 0 {
  570. if len(f) < 16 {
  571. return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
  572. }
  573. traceId := []byte(f[:16])
  574. f = f[16:]
  575. trace.Trace(traceId)
  576. }
  577. switch op {
  578. case opReady:
  579. return readyFrame{}, nil
  580. case opResult:
  581. switch kind := f.readInt(); kind {
  582. case resultKindVoid:
  583. return resultVoidFrame{}, nil
  584. case resultKindRows:
  585. columns, pageState := f.readMetaData()
  586. numRows := f.readInt()
  587. values := make([][]byte, numRows*len(columns))
  588. for i := 0; i < len(values); i++ {
  589. values[i] = f.readBytes()
  590. }
  591. rows := make([][][]byte, numRows)
  592. for i := 0; i < numRows; i++ {
  593. rows[i], values = values[:len(columns)], values[len(columns):]
  594. }
  595. return resultRowsFrame{columns, rows, pageState}, nil
  596. case resultKindKeyspace:
  597. keyspace := f.readString()
  598. return resultKeyspaceFrame{keyspace}, nil
  599. case resultKindPrepared:
  600. id := f.readShortBytes()
  601. args, _ := f.readMetaData()
  602. if c.version < 2 {
  603. return resultPreparedFrame{PreparedId: id, Arguments: args}, nil
  604. }
  605. rvals, _ := f.readMetaData()
  606. return resultPreparedFrame{PreparedId: id, Arguments: args, ReturnValues: rvals}, nil
  607. case resultKindSchemaChanged:
  608. return resultVoidFrame{}, nil
  609. default:
  610. return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
  611. }
  612. case opAuthenticate:
  613. return authenticateFrame{f.readString()}, nil
  614. case opAuthChallenge:
  615. return authChallengeFrame{f.readBytes()}, nil
  616. case opAuthSuccess:
  617. return authSuccessFrame{f.readBytes()}, nil
  618. case opSupported:
  619. return supportedFrame{}, nil
  620. case opError:
  621. return f.readError(), nil
  622. default:
  623. return nil, NewErrProtocol("Decoding frame: unknown op", op)
  624. }
  625. }
  626. func (c *Conn) setKeepalive(d time.Duration) error {
  627. if tc, ok := c.conn.(*net.TCPConn); ok {
  628. err := tc.SetKeepAlivePeriod(d)
  629. if err != nil {
  630. return err
  631. }
  632. return tc.SetKeepAlive(true)
  633. }
  634. return nil
  635. }
  636. // QueryInfo represents the meta data associated with a prepared CQL statement.
  637. type QueryInfo struct {
  638. Id []byte
  639. Args []ColumnInfo
  640. Rval []ColumnInfo
  641. }
  642. type callReq struct {
  643. active int32
  644. resp chan callResp
  645. }
  646. type callResp struct {
  647. buf frame
  648. err error
  649. }
  650. type inflightPrepare struct {
  651. info *QueryInfo
  652. err error
  653. wg sync.WaitGroup
  654. }
  655. var (
  656. ErrQueryArgLength = errors.New("query argument length mismatch")
  657. )