实现WebSocket服务端
实现HTTP服务端
- WebSocket是HTTP协议upgrade而来
- 使用http标准库快速实现空接口:/ws
完成WebSocket握手
- 使用websocket.Upgrader完成协议握手,得到WebSocket长连接
- 操作websocket api,读取客户端消息,然后原样发送回去
封装WebSocket
API原理
- SendMessage将消息投递到 out channel
- ReadMessage从 in channel 读取消息
内部原理
- 启动读协程,循环读取WebSocket,将消息投递到in channel
- 启动写协程,玄幻读取 out channel,将消息写给WebSocket
-
代码示例
-
目录层级
-
connection.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
package bag import ( "errors" "github.com/gorilla/websocket" "sync" ) type Connection struct { wsConn *websocket.Conn inChan chan []byte outChan chan []byte closeChan chan byte mutex sync.Mutex isClosed bool } func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) { conn = &Connection{ wsConn: wsConn, inChan: make(chan []byte, 1000), outChan: make(chan []byte, 1000), closeChan: make(chan byte, 1), } // 初始化时启动一个读协程 go conn.readLoop() // 初始化时启动一个写协程 go conn.writeLoop() return } // API func (conn *Connection) ReadMessage() (data []byte, err error) { select { case data = <-conn.inChan: case <-conn.closeChan: err = errors.New("connection is closed") } data = <-conn.inChan return } func (conn *Connection) WriteMessage(data []byte) (err error) { select { case conn.outChan <- data: case <-conn.closeChan: err = errors.New("connection is closed") } conn.outChan <- data return } // close func (conn *Connection) Close() { // 线程安全,可重入的Close conn.wsConn.Close() // 保证只执行一次,但是为了线程安全,需要加锁 conn.mutex.Lock() if !conn.isClosed { close(conn.closeChan) conn.isClosed = true } conn.mutex.Unlock() } // 内部实现 func (conn *Connection) readLoop() { var ( data []byte err error ) for { if _, data, err = conn.wsConn.ReadMessage(); err != nil { goto ERR } // 持续不读 会超过inChan范围 导致阻塞 等待inChan有位置才会继续写入 //conn.inChan <- data // ↑↑↑ 优化方案 使用select select { case conn.inChan <- data: case <-conn.closeChan: // closeChan关闭的时候 goto ERR } } ERR: conn.Close() } func (conn *Connection) writeLoop() { var ( data []byte err error ) for { //data = <-conn.outChan // ↑↑↑ 优化方案 使用select select { case data = <-conn.outChan: case <-conn.closeChan: goto ERR } if err = conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil { goto ERR } } ERR: conn.Close() }
-
websocket.go
-