123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794 |
- package pq
- // Package pq is a pure Go Postgres driver for the database/sql package.
- // This module contains support for Postgres LISTEN/NOTIFY.
- import (
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- )
- // Notification represents a single notification from the database.
- type Notification struct {
- // Process ID (PID) of the notifying postgres backend.
- BePid int
- // Name of the channel the notification was sent on.
- Channel string
- // Payload, or the empty string if unspecified.
- Extra string
- }
- func recvNotification(r *readBuf) *Notification {
- bePid := r.int32()
- channel := r.string()
- extra := r.string()
- return &Notification{bePid, channel, extra}
- }
- const (
- connStateIdle int32 = iota
- connStateExpectResponse
- connStateExpectReadyForQuery
- )
- type message struct {
- typ byte
- err error
- }
- var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
- // ListenerConn is a low-level interface for waiting for notifications. You
- // should use Listener instead.
- type ListenerConn struct {
- // guards cn and err
- connectionLock sync.Mutex
- cn *conn
- err error
- connState int32
- // the sending goroutine will be holding this lock
- senderLock sync.Mutex
- notificationChan chan<- *Notification
- replyChan chan message
- }
- // NewListenerConn creates a new ListenerConn. Use NewListener instead.
- func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
- return newDialListenerConn(defaultDialer{}, name, notificationChan)
- }
- func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
- cn, err := DialOpen(d, name)
- if err != nil {
- return nil, err
- }
- l := &ListenerConn{
- cn: cn.(*conn),
- notificationChan: c,
- connState: connStateIdle,
- replyChan: make(chan message, 2),
- }
- go l.listenerConnMain()
- return l, nil
- }
- // We can only allow one goroutine at a time to be running a query on the
- // connection for various reasons, so the goroutine sending on the connection
- // must be holding senderLock.
- //
- // Returns an error if an unrecoverable error has occurred and the ListenerConn
- // should be abandoned.
- func (l *ListenerConn) acquireSenderLock() error {
- // we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
- l.senderLock.Lock()
- l.connectionLock.Lock()
- err := l.err
- l.connectionLock.Unlock()
- if err != nil {
- l.senderLock.Unlock()
- return err
- }
- return nil
- }
- func (l *ListenerConn) releaseSenderLock() {
- l.senderLock.Unlock()
- }
- // setState advances the protocol state to newState. Returns false if moving
- // to that state from the current state is not allowed.
- func (l *ListenerConn) setState(newState int32) bool {
- var expectedState int32
- switch newState {
- case connStateIdle:
- expectedState = connStateExpectReadyForQuery
- case connStateExpectResponse:
- expectedState = connStateIdle
- case connStateExpectReadyForQuery:
- expectedState = connStateExpectResponse
- default:
- panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
- }
- return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
- }
- // Main logic is here: receive messages from the postgres backend, forward
- // notifications and query replies and keep the internal state in sync with the
- // protocol state. Returns when the connection has been lost, is about to go
- // away or should be discarded because we couldn't agree on the state with the
- // server backend.
- func (l *ListenerConn) listenerConnLoop() (err error) {
- defer errRecoverNoErrBadConn(&err)
- r := &readBuf{}
- for {
- t, err := l.cn.recvMessage(r)
- if err != nil {
- return err
- }
- switch t {
- case 'A':
- // recvNotification copies all the data so we don't need to worry
- // about the scratch buffer being overwritten.
- l.notificationChan <- recvNotification(r)
- case 'T', 'D':
- // only used by tests; ignore
- case 'E':
- // We might receive an ErrorResponse even when not in a query; it
- // is expected that the server will close the connection after
- // that, but we should make sure that the error we display is the
- // one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
- if !l.setState(connStateExpectReadyForQuery) {
- return parseError(r)
- }
- l.replyChan <- message{t, parseError(r)}
- case 'C', 'I':
- if !l.setState(connStateExpectReadyForQuery) {
- // protocol out of sync
- return fmt.Errorf("unexpected CommandComplete")
- }
- // ExecSimpleQuery doesn't need to know about this message
- case 'Z':
- if !l.setState(connStateIdle) {
- // protocol out of sync
- return fmt.Errorf("unexpected ReadyForQuery")
- }
- l.replyChan <- message{t, nil}
- case 'N', 'S':
- // ignore
- default:
- return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
- }
- }
- }
- // This is the main routine for the goroutine receiving on the database
- // connection. Most of the main logic is in listenerConnLoop.
- func (l *ListenerConn) listenerConnMain() {
- err := l.listenerConnLoop()
- // listenerConnLoop terminated; we're done, but we still have to clean up.
- // Make sure nobody tries to start any new queries by making sure the err
- // pointer is set. It is important that we do not overwrite its value; a
- // connection could be closed by either this goroutine or one sending on
- // the connection -- whoever closes the connection is assumed to have the
- // more meaningful error message (as the other one will probably get
- // net.errClosed), so that goroutine sets the error we expose while the
- // other error is discarded. If the connection is lost while two
- // goroutines are operating on the socket, it probably doesn't matter which
- // error we expose so we don't try to do anything more complex.
- l.connectionLock.Lock()
- if l.err == nil {
- l.err = err
- }
- l.cn.Close()
- l.connectionLock.Unlock()
- // There might be a query in-flight; make sure nobody's waiting for a
- // response to it, since there's not going to be one.
- close(l.replyChan)
- // let the listener know we're done
- close(l.notificationChan)
- // this ListenerConn is done
- }
- // Listen sends a LISTEN query to the server. See ExecSimpleQuery.
- func (l *ListenerConn) Listen(channel string) (bool, error) {
- return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
- }
- // Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
- func (l *ListenerConn) Unlisten(channel string) (bool, error) {
- return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
- }
- // UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
- func (l *ListenerConn) UnlistenAll() (bool, error) {
- return l.ExecSimpleQuery("UNLISTEN *")
- }
- // Ping the remote server to make sure it's alive. Non-nil error means the
- // connection has failed and should be abandoned.
- func (l *ListenerConn) Ping() error {
- sent, err := l.ExecSimpleQuery("")
- if !sent {
- return err
- }
- if err != nil {
- // shouldn't happen
- panic(err)
- }
- return nil
- }
- // Attempt to send a query on the connection. Returns an error if sending the
- // query failed, and the caller should initiate closure of this connection.
- // The caller must be holding senderLock (see acquireSenderLock and
- // releaseSenderLock).
- func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
- defer errRecoverNoErrBadConn(&err)
- // must set connection state before sending the query
- if !l.setState(connStateExpectResponse) {
- panic("two queries running at the same time")
- }
- // Can't use l.cn.writeBuf here because it uses the scratch buffer which
- // might get overwritten by listenerConnLoop.
- b := &writeBuf{
- buf: []byte("Q\x00\x00\x00\x00"),
- pos: 1,
- }
- b.string(q)
- l.cn.send(b)
- return nil
- }
- // ExecSimpleQuery executes a "simple query" (i.e. one with no bindable
- // parameters) on the connection. The possible return values are:
- // 1) "executed" is true; the query was executed to completion on the
- // database server. If the query failed, err will be set to the error
- // returned by the database, otherwise err will be nil.
- // 2) If "executed" is false, the query could not be executed on the remote
- // server. err will be non-nil.
- //
- // After a call to ExecSimpleQuery has returned an executed=false value, the
- // connection has either been closed or will be closed shortly thereafter, and
- // all subsequently executed queries will return an error.
- func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
- if err = l.acquireSenderLock(); err != nil {
- return false, err
- }
- defer l.releaseSenderLock()
- err = l.sendSimpleQuery(q)
- if err != nil {
- // We can't know what state the protocol is in, so we need to abandon
- // this connection.
- l.connectionLock.Lock()
- // Set the error pointer if it hasn't been set already; see
- // listenerConnMain.
- if l.err == nil {
- l.err = err
- }
- l.connectionLock.Unlock()
- l.cn.c.Close()
- return false, err
- }
- // now we just wait for a reply..
- for {
- m, ok := <-l.replyChan
- if !ok {
- // We lost the connection to server, don't bother waiting for a
- // a response. err should have been set already.
- l.connectionLock.Lock()
- err := l.err
- l.connectionLock.Unlock()
- return false, err
- }
- switch m.typ {
- case 'Z':
- // sanity check
- if m.err != nil {
- panic("m.err != nil")
- }
- // done; err might or might not be set
- return true, err
- case 'E':
- // sanity check
- if m.err == nil {
- panic("m.err == nil")
- }
- // server responded with an error; ReadyForQuery to follow
- err = m.err
- default:
- return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
- }
- }
- }
- // Close closes the connection.
- func (l *ListenerConn) Close() error {
- l.connectionLock.Lock()
- if l.err != nil {
- l.connectionLock.Unlock()
- return errListenerConnClosed
- }
- l.err = errListenerConnClosed
- l.connectionLock.Unlock()
- // We can't send anything on the connection without holding senderLock.
- // Simply close the net.Conn to wake up everyone operating on it.
- return l.cn.c.Close()
- }
- // Err returns the reason the connection was closed. It is not safe to call
- // this function until l.Notify has been closed.
- func (l *ListenerConn) Err() error {
- return l.err
- }
- var errListenerClosed = errors.New("pq: Listener has been closed")
- // ErrChannelAlreadyOpen is returned from Listen when a channel is already
- // open.
- var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
- // ErrChannelNotOpen is returned from Unlisten when a channel is not open.
- var ErrChannelNotOpen = errors.New("pq: channel is not open")
- // ListenerEventType is an enumeration of listener event types.
- type ListenerEventType int
- const (
- // ListenerEventConnected is emitted only when the database connection
- // has been initially initialized. The err argument of the callback
- // will always be nil.
- ListenerEventConnected ListenerEventType = iota
- // ListenerEventDisconnected is emitted after a database connection has
- // been lost, either because of an error or because Close has been
- // called. The err argument will be set to the reason the database
- // connection was lost.
- ListenerEventDisconnected
- // ListenerEventReconnected is emitted after a database connection has
- // been re-established after connection loss. The err argument of the
- // callback will always be nil. After this event has been emitted, a
- // nil pq.Notification is sent on the Listener.Notify channel.
- ListenerEventReconnected
- // ListenerEventConnectionAttemptFailed is emitted after a connection
- // to the database was attempted, but failed. The err argument will be
- // set to an error describing why the connection attempt did not
- // succeed.
- ListenerEventConnectionAttemptFailed
- )
- // EventCallbackType is the event callback type. See also ListenerEventType
- // constants' documentation.
- type EventCallbackType func(event ListenerEventType, err error)
- // Listener provides an interface for listening to notifications from a
- // PostgreSQL database. For general usage information, see section
- // "Notifications".
- //
- // Listener can safely be used from concurrently running goroutines.
- type Listener struct {
- // Channel for receiving notifications from the database. In some cases a
- // nil value will be sent. See section "Notifications" above.
- Notify chan *Notification
- name string
- minReconnectInterval time.Duration
- maxReconnectInterval time.Duration
- dialer Dialer
- eventCallback EventCallbackType
- lock sync.Mutex
- isClosed bool
- reconnectCond *sync.Cond
- cn *ListenerConn
- connNotificationChan <-chan *Notification
- channels map[string]struct{}
- }
- // NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
- //
- // name should be set to a connection string to be used to establish the
- // database connection (see section "Connection String Parameters" above).
- //
- // minReconnectInterval controls the duration to wait before trying to
- // re-establish the database connection after connection loss. After each
- // consecutive failure this interval is doubled, until maxReconnectInterval is
- // reached. Successfully completing the connection establishment procedure
- // resets the interval back to minReconnectInterval.
- //
- // The last parameter eventCallback can be set to a function which will be
- // called by the Listener when the state of the underlying database connection
- // changes. This callback will be called by the goroutine which dispatches the
- // notifications over the Notify channel, so you should try to avoid doing
- // potentially time-consuming operations from the callback.
- func NewListener(name string,
- minReconnectInterval time.Duration,
- maxReconnectInterval time.Duration,
- eventCallback EventCallbackType) *Listener {
- return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
- }
- // NewDialListener is like NewListener but it takes a Dialer.
- func NewDialListener(d Dialer,
- name string,
- minReconnectInterval time.Duration,
- maxReconnectInterval time.Duration,
- eventCallback EventCallbackType) *Listener {
- l := &Listener{
- name: name,
- minReconnectInterval: minReconnectInterval,
- maxReconnectInterval: maxReconnectInterval,
- dialer: d,
- eventCallback: eventCallback,
- channels: make(map[string]struct{}),
- Notify: make(chan *Notification, 32),
- }
- l.reconnectCond = sync.NewCond(&l.lock)
- go l.listenerMain()
- return l
- }
- // NotificationChannel returns the notification channel for this listener.
- // This is the same channel as Notify, and will not be recreated during the
- // life time of the Listener.
- func (l *Listener) NotificationChannel() <-chan *Notification {
- return l.Notify
- }
- // Listen starts listening for notifications on a channel. Calls to this
- // function will block until an acknowledgement has been received from the
- // server. Note that Listener automatically re-establishes the connection
- // after connection loss, so this function may block indefinitely if the
- // connection can not be re-established.
- //
- // Listen will only fail in three conditions:
- // 1) The channel is already open. The returned error will be
- // ErrChannelAlreadyOpen.
- // 2) The query was executed on the remote server, but PostgreSQL returned an
- // error message in response to the query. The returned error will be a
- // pq.Error containing the information the server supplied.
- // 3) Close is called on the Listener before the request could be completed.
- //
- // The channel name is case-sensitive.
- func (l *Listener) Listen(channel string) error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if l.isClosed {
- return errListenerClosed
- }
- // The server allows you to issue a LISTEN on a channel which is already
- // open, but it seems useful to be able to detect this case to spot for
- // mistakes in application logic. If the application genuinely does't
- // care, it can check the exported error and ignore it.
- _, exists := l.channels[channel]
- if exists {
- return ErrChannelAlreadyOpen
- }
- if l.cn != nil {
- // If gotResponse is true but error is set, the query was executed on
- // the remote server, but resulted in an error. This should be
- // relatively rare, so it's fine if we just pass the error to our
- // caller. However, if gotResponse is false, we could not complete the
- // query on the remote server and our underlying connection is about
- // to go away, so we only add relname to l.channels, and wait for
- // resync() to take care of the rest.
- gotResponse, err := l.cn.Listen(channel)
- if gotResponse && err != nil {
- return err
- }
- }
- l.channels[channel] = struct{}{}
- for l.cn == nil {
- l.reconnectCond.Wait()
- // we let go of the mutex for a while
- if l.isClosed {
- return errListenerClosed
- }
- }
- return nil
- }
- // Unlisten removes a channel from the Listener's channel list. Returns
- // ErrChannelNotOpen if the Listener is not listening on the specified channel.
- // Returns immediately with no error if there is no connection. Note that you
- // might still get notifications for this channel even after Unlisten has
- // returned.
- //
- // The channel name is case-sensitive.
- func (l *Listener) Unlisten(channel string) error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if l.isClosed {
- return errListenerClosed
- }
- // Similarly to LISTEN, this is not an error in Postgres, but it seems
- // useful to distinguish from the normal conditions.
- _, exists := l.channels[channel]
- if !exists {
- return ErrChannelNotOpen
- }
- if l.cn != nil {
- // Similarly to Listen (see comment in that function), the caller
- // should only be bothered with an error if it came from the backend as
- // a response to our query.
- gotResponse, err := l.cn.Unlisten(channel)
- if gotResponse && err != nil {
- return err
- }
- }
- // Don't bother waiting for resync if there's no connection.
- delete(l.channels, channel)
- return nil
- }
- // UnlistenAll removes all channels from the Listener's channel list. Returns
- // immediately with no error if there is no connection. Note that you might
- // still get notifications for any of the deleted channels even after
- // UnlistenAll has returned.
- func (l *Listener) UnlistenAll() error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if l.isClosed {
- return errListenerClosed
- }
- if l.cn != nil {
- // Similarly to Listen (see comment in that function), the caller
- // should only be bothered with an error if it came from the backend as
- // a response to our query.
- gotResponse, err := l.cn.UnlistenAll()
- if gotResponse && err != nil {
- return err
- }
- }
- // Don't bother waiting for resync if there's no connection.
- l.channels = make(map[string]struct{})
- return nil
- }
- // Ping the remote server to make sure it's alive. Non-nil return value means
- // that there is no active connection.
- func (l *Listener) Ping() error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if l.isClosed {
- return errListenerClosed
- }
- if l.cn == nil {
- return errors.New("no connection")
- }
- return l.cn.Ping()
- }
- // Clean up after losing the server connection. Returns l.cn.Err(), which
- // should have the reason the connection was lost.
- func (l *Listener) disconnectCleanup() error {
- l.lock.Lock()
- defer l.lock.Unlock()
- // sanity check; can't look at Err() until the channel has been closed
- select {
- case _, ok := <-l.connNotificationChan:
- if ok {
- panic("connNotificationChan not closed")
- }
- default:
- panic("connNotificationChan not closed")
- }
- err := l.cn.Err()
- l.cn.Close()
- l.cn = nil
- return err
- }
- // Synchronize the list of channels we want to be listening on with the server
- // after the connection has been established.
- func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
- doneChan := make(chan error)
- go func(notificationChan <-chan *Notification) {
- for channel := range l.channels {
- // If we got a response, return that error to our caller as it's
- // going to be more descriptive than cn.Err().
- gotResponse, err := cn.Listen(channel)
- if gotResponse && err != nil {
- doneChan <- err
- return
- }
- // If we couldn't reach the server, wait for notificationChan to
- // close and then return the error message from the connection, as
- // per ListenerConn's interface.
- if err != nil {
- for range notificationChan {
- }
- doneChan <- cn.Err()
- return
- }
- }
- doneChan <- nil
- }(notificationChan)
- // Ignore notifications while synchronization is going on to avoid
- // deadlocks. We have to send a nil notification over Notify anyway as
- // we can't possibly know which notifications (if any) were lost while
- // the connection was down, so there's no reason to try and process
- // these messages at all.
- for {
- select {
- case _, ok := <-notificationChan:
- if !ok {
- notificationChan = nil
- }
- case err := <-doneChan:
- return err
- }
- }
- }
- // caller should NOT be holding l.lock
- func (l *Listener) closed() bool {
- l.lock.Lock()
- defer l.lock.Unlock()
- return l.isClosed
- }
- func (l *Listener) connect() error {
- notificationChan := make(chan *Notification, 32)
- cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
- if err != nil {
- return err
- }
- l.lock.Lock()
- defer l.lock.Unlock()
- err = l.resync(cn, notificationChan)
- if err != nil {
- cn.Close()
- return err
- }
- l.cn = cn
- l.connNotificationChan = notificationChan
- l.reconnectCond.Broadcast()
- return nil
- }
- // Close disconnects the Listener from the database and shuts it down.
- // Subsequent calls to its methods will return an error. Close returns an
- // error if the connection has already been closed.
- func (l *Listener) Close() error {
- l.lock.Lock()
- defer l.lock.Unlock()
- if l.isClosed {
- return errListenerClosed
- }
- if l.cn != nil {
- l.cn.Close()
- }
- l.isClosed = true
- return nil
- }
- func (l *Listener) emitEvent(event ListenerEventType, err error) {
- if l.eventCallback != nil {
- l.eventCallback(event, err)
- }
- }
- // Main logic here: maintain a connection to the server when possible, wait
- // for notifications and emit events.
- func (l *Listener) listenerConnLoop() {
- var nextReconnect time.Time
- reconnectInterval := l.minReconnectInterval
- for {
- for {
- err := l.connect()
- if err == nil {
- break
- }
- if l.closed() {
- return
- }
- l.emitEvent(ListenerEventConnectionAttemptFailed, err)
- time.Sleep(reconnectInterval)
- reconnectInterval *= 2
- if reconnectInterval > l.maxReconnectInterval {
- reconnectInterval = l.maxReconnectInterval
- }
- }
- if nextReconnect.IsZero() {
- l.emitEvent(ListenerEventConnected, nil)
- } else {
- l.emitEvent(ListenerEventReconnected, nil)
- l.Notify <- nil
- }
- reconnectInterval = l.minReconnectInterval
- nextReconnect = time.Now().Add(reconnectInterval)
- for {
- notification, ok := <-l.connNotificationChan
- if !ok {
- // lost connection, loop again
- break
- }
- l.Notify <- notification
- }
- err := l.disconnectCleanup()
- if l.closed() {
- return
- }
- l.emitEvent(ListenerEventDisconnected, err)
- time.Sleep(time.Until(nextReconnect))
- }
- }
- func (l *Listener) listenerMain() {
- l.listenerConnLoop()
- close(l.Notify)
- }
|