client.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package internal
  2. import (
  3. "bytes"
  4. "log"
  5. "net/http"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. )
  9. const (
  10. // Time allowed to write a message to the peer.
  11. writeWait = 10 * time.Second
  12. // Time allowed to read the next pong message from the peer.
  13. pongWait = 60 * time.Second
  14. // Send pings to peer with this period. Must be less than pongWait.
  15. pingPeriod = (pongWait * 9) / 10
  16. // Maximum message size allowed from peer.
  17. maxMessageSize = 512
  18. // send buffer size
  19. bufSize = 256
  20. )
  21. var (
  22. newline = []byte{'\n'}
  23. space = []byte{' '}
  24. )
  25. var upgrader = websocket.Upgrader{
  26. ReadBufferSize: 1024,
  27. WriteBufferSize: 1024,
  28. }
  29. // Client is a middleman between the websocket connection and the hub.
  30. type Client struct {
  31. hub *Hub
  32. // The websocket connection.
  33. conn *websocket.Conn
  34. // Buffered channel of outbound messages.
  35. send chan []byte
  36. }
  37. // readPump pumps messages from the websocket connection to the hub.
  38. //
  39. // The application runs readPump in a per-connection goroutine. The application
  40. // ensures that there is at most one reader on a connection by executing all
  41. // reads from this goroutine.
  42. func (c *Client) readPump() {
  43. defer func() {
  44. c.hub.unregister <- c
  45. c.conn.Close()
  46. }()
  47. c.conn.SetReadLimit(maxMessageSize)
  48. c.conn.SetReadDeadline(time.Now().Add(pongWait))
  49. c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
  50. for {
  51. _, message, err := c.conn.ReadMessage()
  52. if err != nil {
  53. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  54. log.Printf("error: %v", err)
  55. }
  56. break
  57. }
  58. message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
  59. c.hub.broadcast <- message
  60. }
  61. }
  62. // writePump pumps messages from the hub to the websocket connection.
  63. //
  64. // A goroutine running writePump is started for each connection. The
  65. // application ensures that there is at most one writer to a connection by
  66. // executing all writes from this goroutine.
  67. func (c *Client) writePump() {
  68. ticker := time.NewTicker(pingPeriod)
  69. defer func() {
  70. ticker.Stop()
  71. c.conn.Close()
  72. }()
  73. for {
  74. select {
  75. case message, ok := <-c.send:
  76. c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  77. if !ok {
  78. // The hub closed the channel.
  79. c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  80. return
  81. }
  82. w, err := c.conn.NextWriter(websocket.TextMessage)
  83. if err != nil {
  84. return
  85. }
  86. w.Write(message)
  87. // Add queued chat messages to the current websocket message.
  88. n := len(c.send)
  89. for i := 0; i < n; i++ {
  90. w.Write(newline)
  91. w.Write(<-c.send)
  92. }
  93. if err := w.Close(); err != nil {
  94. return
  95. }
  96. case <-ticker.C:
  97. c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  98. if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  99. return
  100. }
  101. }
  102. }
  103. }
  104. // ServeWs handles websocket requests from the peer.
  105. func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
  106. conn, err := upgrader.Upgrade(w, r, nil)
  107. if err != nil {
  108. log.Println(err)
  109. return
  110. }
  111. client := &Client{
  112. hub: hub,
  113. conn: conn,
  114. send: make(chan []byte, bufSize),
  115. }
  116. client.hub.register <- client
  117. // Allow collection of memory referenced by the caller by doing all work in
  118. // new goroutines.
  119. go client.writePump()
  120. go client.readPump()
  121. }