Skip to content

实现WebSocket服务端

实现HTTP服务端

  • WebSocket是HTTP协议upgrade而来
  • 使用http标准库快速实现空接口:/ws
    package main
    
    import "net/http"
    
    func wsHandle(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte("hello"))
    }
    
    func main() {
        http.HandleFunc("/ws", wsHandle) /* http://localhost:7777/ws */
        http.ListenAndServe("localhost:7777", nil)
    }
    

完成WebSocket握手

  • 使用websocket.Upgrader完成协议握手,得到WebSocket长连接
  • 操作websocket api,读取客户端消息,然后原样发送回去
    package main
    
    import (
        "github.com/gorilla/websocket"
        "net/http"
    )
    
    var (
        upgrader = websocket.Upgrader{
            // 允许跨域
            CheckOrigin: func(r *http.Request) bool {
                return true
            },
        }
    )
    
    func wsHandle(w http.ResponseWriter, r *http.Request) {
        var (
            conn *websocket.Conn
            err  error
            data []byte
        )
        // Upgrade: websocket
        if conn, err = upgrader.Upgrade(w, r, nil); err != nil {
            return
        }
    
        // websocket.Conn
        for {
            // msgType: text, Binary, json
            if _, data, err = conn.ReadMessage(); err != nil {
                goto ERR
            }
            if err = conn.WriteMessage(websocket.TextMessage, data); err != nil {
                goto ERR
            }
    
        }
    ERR:
        conn.Close()
    }
    
    func main() {
        http.HandleFunc("/ws", wsHandle) /* http://localhost:7777/ws */
        http.ListenAndServe("localhost:7777", nil)
    }
    

封装WebSocket

API原理

  • SendMessage将消息投递到 out channel
  • ReadMessage从 in channel 读取消息

内部原理

  • 启动读协程,循环读取WebSocket,将消息投递到in channel
  • 启动写协程,玄幻读取 out channel,将消息写给WebSocket
  • 代码示例

    • 目录层级

          .
      ├── bag
         └── connection.go
      ├── go.mod
      ├── go.sum
      └── websocket.go
      

    • connection.go

      package bag
      
      import (
          "errors"
          "github.com/gorilla/websocket"
          "sync"
      )
      
      type Connection struct {
          wsConn    *websocket.Conn
          inChan    chan []byte
          outChan   chan []byte
          closeChan chan byte
      
          mutex    sync.Mutex
          isClosed bool
      }
      
      func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
          conn = &Connection{
              wsConn:    wsConn,
              inChan:    make(chan []byte, 1000),
              outChan:   make(chan []byte, 1000),
              closeChan: make(chan byte, 1),
          }
      
          // 初始化时启动一个读协程
          go conn.readLoop()
      
          // 初始化时启动一个写协程
          go conn.writeLoop()
      
          return
      }
      
      // API
      func (conn *Connection) ReadMessage() (data []byte, err error) {
          select {
          case data = <-conn.inChan:
          case <-conn.closeChan:
              err = errors.New("connection is closed")
          }
          data = <-conn.inChan
          return
      }
      
      func (conn *Connection) WriteMessage(data []byte) (err error) {
          select {
          case conn.outChan <- data:
          case <-conn.closeChan:
              err = errors.New("connection is closed")
          }
          conn.outChan <- data
          return
      }
      
      // close
      func (conn *Connection) Close() {
          // 线程安全,可重入的Close
          conn.wsConn.Close()
      
          // 保证只执行一次,但是为了线程安全,需要加锁
          conn.mutex.Lock()
          if !conn.isClosed {
              close(conn.closeChan)
              conn.isClosed = true
          }
          conn.mutex.Unlock()
      }
      
      // 内部实现
      func (conn *Connection) readLoop() {
          var (
              data []byte
              err  error
          )
          for {
              if _, data, err = conn.wsConn.ReadMessage(); err != nil {
                  goto ERR
              }
              // 持续不读 会超过inChan范围 导致阻塞 等待inChan有位置才会继续写入
              //conn.inChan <- data
      
              // ↑↑↑ 优化方案 使用select
              select {
              case conn.inChan <- data:
              case <-conn.closeChan:
                  // closeChan关闭的时候
                  goto ERR
              }
          }
      ERR:
          conn.Close()
      }
      
      func (conn *Connection) writeLoop() {
          var (
              data []byte
              err  error
          )
          for {
              //data = <-conn.outChan
      
              // ↑↑↑ 优化方案 使用select
              select {
              case data = <-conn.outChan:
              case <-conn.closeChan:
                  goto ERR
              }
      
              if err = conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil {
                  goto ERR
              }
          }
      ERR:
          conn.Close()
      }
      

    • websocket.go

      package main
      
      import (
          "github.com/gorilla/websocket"
          "net/http"
          "socket/bag"
          "time"
      )
      
      var (
          upgrader = websocket.Upgrader{
              // 允许跨域
              CheckOrigin: func(r *http.Request) bool {
                  return true
              },
          }
      )
      
      func wsHandle(w http.ResponseWriter, r *http.Request) {
          var (
              wsConn *websocket.Conn
              err    error
              //data []byte
              conn *bag.Connection
          )
          // Upgrade: websocket
          if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
              return
          }
      
          if conn, err = bag.InitConnection(wsConn); err != nil {
              goto ERR
          }
      
          go func() {
              var (
                  err error
              )
              for {
                  if err = conn.WriteMessage([]byte("heartbeat")); err != nil {
                      return
                  }
                  time.Sleep(1 * time.Second)
              }
          }()
      
          for {
              if data, err = conn.ReadMessage(); err != nil {
                  goto ERR
              }
              if err = conn.WriteMessage(data); err != nil {
                  goto ERR
              }
          }
      
      ERR:
          // 关闭连接操作
          conn.Close()
      }
      
      func main() {
          http.HandleFunc("/ws", wsHandle) /* http://localhost:7777/ws */
          http.ListenAndServe("localhost:7777", nil)
      }