123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- // +build go1.8
- package pq
- import (
- "context"
- "database/sql"
- "database/sql/driver"
- "fmt"
- "io"
- "io/ioutil"
- )
- // Implement the "QueryerContext" interface
- func (cn *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
- list := make([]driver.Value, len(args))
- for i, nv := range args {
- list[i] = nv.Value
- }
- finish := cn.watchCancel(ctx)
- r, err := cn.query(query, list)
- if err != nil {
- if finish != nil {
- finish()
- }
- return nil, err
- }
- r.finish = finish
- return r, nil
- }
- // Implement the "ExecerContext" interface
- func (cn *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
- list := make([]driver.Value, len(args))
- for i, nv := range args {
- list[i] = nv.Value
- }
- if finish := cn.watchCancel(ctx); finish != nil {
- defer finish()
- }
- return cn.Exec(query, list)
- }
- // Implement the "ConnBeginTx" interface
- func (cn *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
- var mode string
- switch sql.IsolationLevel(opts.Isolation) {
- case sql.LevelDefault:
- // Don't touch mode: use the server's default
- case sql.LevelReadUncommitted:
- mode = " ISOLATION LEVEL READ UNCOMMITTED"
- case sql.LevelReadCommitted:
- mode = " ISOLATION LEVEL READ COMMITTED"
- case sql.LevelRepeatableRead:
- mode = " ISOLATION LEVEL REPEATABLE READ"
- case sql.LevelSerializable:
- mode = " ISOLATION LEVEL SERIALIZABLE"
- default:
- return nil, fmt.Errorf("pq: isolation level not supported: %d", opts.Isolation)
- }
- if opts.ReadOnly {
- mode += " READ ONLY"
- } else {
- mode += " READ WRITE"
- }
- tx, err := cn.begin(mode)
- if err != nil {
- return nil, err
- }
- cn.txnFinish = cn.watchCancel(ctx)
- return tx, nil
- }
- func (cn *conn) watchCancel(ctx context.Context) func() {
- if done := ctx.Done(); done != nil {
- finished := make(chan struct{})
- go func() {
- select {
- case <-done:
- _ = cn.cancel()
- finished <- struct{}{}
- case <-finished:
- }
- }()
- return func() {
- select {
- case <-finished:
- case finished <- struct{}{}:
- }
- }
- }
- return nil
- }
- func (cn *conn) cancel() error {
- c, err := dial(cn.dialer, cn.opts)
- if err != nil {
- return err
- }
- defer c.Close()
- {
- can := conn{
- c: c,
- }
- err = can.ssl(cn.opts)
- if err != nil {
- return err
- }
- w := can.writeBuf(0)
- w.int32(80877102) // cancel request code
- w.int32(cn.processID)
- w.int32(cn.secretKey)
- if err := can.sendStartupPacket(w); err != nil {
- return err
- }
- }
- // Read until EOF to ensure that the server received the cancel.
- {
- _, err := io.Copy(ioutil.Discard, c)
- return err
- }
- }
|