package main import ( "bytes" "context" "errors" "fmt" "git.qianqiusoft.com/library/glog" "git.qianqiusoft.com/qianqiusoft/go-uuid/uuid" "net" "sync" "time" ) const ( __KEY = "Light#dauth-@*I2" CMD_NEW = "new" CMD_REMOVE = "remove" CMD_PINGPONG = "pg" CMD_PINGPONG_RESP = "pg_resp" ) /** * 数据包获取接口,跟业务无关 */ type Getter interface { /** * @brief: 包解析器接口 * @param1 bytess: 当前接收到的数据 * @param2 conn: 当前conn * @return1: 获取出来包内容,多个 * @return2: 剩余数据 * @return3: 错误信息 */ Get(bytess []byte, tcpConn *TcpConn)([][]byte, []byte, error) } /** * 包解析器 */ type Parser interface { /** * @brief: 包解析器 * @param1 bytess: 包数据 * @param2 tcpConn: 当前连接 */ Parse(bytess []byte, tcpConn *TcpConn) } /** * 连接回调接口 */ type TcpConnCallback interface { /** * @brief: 新连接回调 * @param1 conn: 连接 */ OnConnected(conn *TcpConn) // /** * @brief: 连接断开回调 * @param1 conn: 连接 * @param2 err: 错误信息,正常关闭的为nil */ OnDisconnected(conn *TcpConn) // /** * @brief: 错误回调 * @param1 conn: 连接 * @param2 err: 错误信息 */ OnError(conn *TcpConn, err error) } /** * tcp 配置信息 */ type Config struct { Ip string // ip Port string // 端口 Network string // 默认tcp, The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket". Interval time.Duration // 心跳间隔 Timeout time.Duration // 超时时间 BufferSize int // 接收缓冲区大小 SendChanSize int // 发送通道大小 RecvChanSize int // 接收通道大小 Getter Getter // tcp 连接处理接口 Parser Parser // 包解析器 ConnCallback TcpConnCallback // 连接回调接口 } /** * tcp 连接 */ type TcpConn struct { id string // id conn net.Conn // 连接 sendChan chan []byte // 发送数据chan recvChan chan []byte // 接收数组chan done chan bool // 完成chan ctx context.Context // 上下文 cancel context.CancelFunc // 取消函数 reqTime time.Time // 最后请求时间 timer *time.Ticker // 超时检查定时器 interval time.Duration // 心跳时间间隔 timeout time.Duration // 超时时间 bufferSize int // 接收缓冲区大小 tcpServer *TcpServer // 服务端 Getter Getter // 包解析器 Parser Parser // 包解析器 Tag interface{} // 连接标识 } /** * @brief: 发送接口 * @param1 data: 数据 */ func (tc *TcpConn)Send(data []byte){ if data == nil{ fmt.Println("发送数据为nil") return } tc.sendChan <- data } /** * @brief: 取消 */ func (tc *TcpConn)Cancel(){ glog.Infoln("tcp conn cancel") if tc.timer != nil{ tc.timer.Stop() } if tc.cancel != nil{ tc.cancel() } tc.conn.Close() } /** * @brief: 启动超时 */ func (tc *TcpConn)startTimeoutCheckProcess(){ go func() { if int64(tc.interval) <= 0{ glog.Infoln("时间间隔小于等于0,不启动超时检测定时器") return } tc.timer = time.NewTicker(tc.interval) defer func() { tc.timer.Stop() glog.Infoln("startTimeoutCheckProcess end") }() for { select { case <-tc.ctx.Done(): glog.Infoln("startTimeoutCheckProcess ctx.cancel") return case <-tc.timer.C: now := time.Now() if now.Sub(tc.reqTime) > tc.timeout { glog.Errorln(tc.conn, "heartbeat timeout exit", now.Format("2006-01-02 15:04:05"), tc.reqTime.Format("2006-01-02 15:04:05")) tc.Cancel() return } } } }() } /** * @brief: 发送处理流程 */ func (tc *TcpConn)startSendProcess() { go func() { glog.Infoln(tc.conn, "startSendProcess start") defer func() { tc.done <- true glog.Infoln(tc.conn, "startSendProcess end.") }() for { select { case <-tc.ctx.Done(): glog.Infoln("startSendProcess ctx.cancel") return case data := <-tc.sendChan: tc.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) if _, err := tc.conn.Write(data); err != nil { glog.Errorln(tc.conn, "conn.Write", err.Error()) return } } } }() } /** * @brief: 接收处理流程 */ func (tc *TcpConn)startRecvProcess(){ go func() { glog.Infoln(tc.conn, "startRecvProcess start") lastRecvDataTime := time.Now() defer func() { glog.Errorln(tc.conn, "last recv time:", lastRecvDataTime.Format("2006-01-02 15:04:05"), "now time:", time.Now().Format("2006-01-02 15:04:05")) tc.done <- true glog.Infoln(tc.conn, "startRecvProcess end.") }() ok, err := checkSign(tc.conn) // 校验 if err != nil{ fmt.Println("校验key异常, 关闭连接", err.Error()) tc.Cancel() return } if !ok{ fmt.Println("key错误, 关闭连接", err.Error()) tc.Cancel() return } err = writeString(tc.conn, "ok") // 应答ok if err != nil{ fmt.Println("应答ok错误, 关闭连接", err.Error()) tc.Cancel() return } // 验证成功后直接同步 mmap := tc.tcpServer.store.All() for _, v := range mmap{ tc.Send(cmdContentToBytes(CMD_NEW, v)) } for { cmd, err := readString(tc.conn) if err != nil{ glog.Errorln(tc.conn, "读取cmd错误:", err.Error()) break } if cmd == CMD_NEW{ err = tc.newHandler() }else if cmd == CMD_REMOVE{ err = tc.removeHandler() } else if cmd == CMD_PINGPONG{ err = tc.pingpongHandler() } else{ glog.Errorln("未知cmd", CMD_NEW) continue } if err != nil{ glog.Errorln(tc.conn, "处理错误:", cmd, err.Error()) break } } }() } // 处理新建 func (tc *TcpConn)newHandler()error{ fmt.Println("处理新建") _, err := readUInt32(tc.conn) // 读取主体长度,这个忽略掉 if err != nil{ glog.Errorln(tc.conn, "读取token错误:", err.Error()) return err } tokenStr, err := readString(tc.conn) // d读取token if err != nil{ glog.Errorln(tc.conn, "读取token错误:", err.Error()) return err } fmt.Println("处理新建1", tokenStr) tokenBytes, err := readBytes(tc.conn) if err != nil{ glog.Errorln(tc.conn, "读取token bytes错误:", err.Error()) return err } fmt.Println("处理新建2") fmt.Println(tokenBytes) tc.tcpServer.store.Set(tokenStr, tokenBytes) tc.tcpServer.Broadcast(tc.id, CMD_NEW, tokenBytes) return nil } // 处理移除 func (tc *TcpConn)removeHandler()error{ fmt.Println("处理删除") tokenStr, err := readString(tc.conn) if err != nil{ glog.Errorln(tc.conn, "读取token错误:", err.Error()) return err } tc.tcpServer.store.Remove(tokenStr) tc.tcpServer.Broadcast(tc.id, CMD_NEW, []byte(tokenStr)) return nil } // 处理心跳 func (tc *TcpConn)pingpongHandler()error{ fmt.Println("处理心跳") buf := bytes.NewBuffer([]byte{}) bytess := []byte(CMD_PINGPONG_RESP) size := len(bytess) buf.Write(uint32ToBytes(uint32(size))) buf.Write(bytess) tc.Send(buf.Bytes()) return nil } type TcpServer struct { config *Config // 配置 connMap sync.Map // 连接列表 store IStore } func NewTcpServer(config *Config, store IStore)*TcpServer{ return &TcpServer{ config: config, store: store, } } /** * @brief: 处理新连接 * @param1 conn: 新连接 * @param2 config: 配置 */ func (ts *TcpServer)handleConn(conn net.Conn, config *Config){ ctx, cancel := context.WithCancel(context.Background()) tcpConn := &TcpConn{ id: uuid.New(), conn: conn, sendChan: make(chan []byte, config.SendChanSize), recvChan: make(chan []byte, config.RecvChanSize), done: make(chan bool, 1), ctx: ctx, cancel: cancel, reqTime: time.Now(), interval: config.Interval, timeout: config.Timeout, bufferSize: ts.config.BufferSize, tcpServer: ts, Getter: config.Getter, Parser: config.Parser, Tag: nil, } ts.connMap.Store(tcpConn.id, tcpConn) defer func() { glog.Infoln(tcpConn.conn, "handleConn end -> conn.Close()") err := tcpConn.conn.Close() if err != nil { glog.Errorln(tcpConn.conn, "conn.Close()", err.Error()) } glog.Infoln(tcpConn.conn, "handleConn end -> cancel()") tcpConn.cancel() }() tcpConn.startSendProcess() tcpConn.startRecvProcess() if ts.config.ConnCallback != nil{ // 新连接回调 ts.config.ConnCallback.OnConnected(tcpConn) } <-tcpConn.done if ts.config.ConnCallback != nil{ // 关闭回调 ts.config.ConnCallback.OnDisconnected(tcpConn) } ts.connMap.Delete(tcpConn.id) } /** * @brief: 广播 * @param1 exceptConnId: 不包含连接kkey * @param2 cmd: 命令 * @param3 content: 内容 */ func (ts *TcpServer)Broadcast(exceptConnId, cmd string, content []byte){ fmt.Println("广播", exceptConnId, cmd) bytess := cmdContentToBytes(cmd, content) ts.connMap.Range(func(k, v interface{})bool{ con := v.(*TcpConn) if con.id != exceptConnId{ con.Send(bytess) } return true }) } /** * @brief: 启动服务端 * @param1 config: tcp配置 * @return1 错误信息 */ func (ts *TcpServer)StartServer()error{ if ts.config == nil{ return errors.New("参数config为nil") } glog.Infoln(ts.config) network := ts.config.Network if network == ""{ network = "tcp" } listen, err := net.Listen(network, ts.config.Ip + ":" + ts.config.Port) if err != nil { glog.Errorln("监听端口失败:", err.Error()) return err } glog.Infoln("启动dauth服务") go func() { for { conn, err := listen.Accept() if err != nil { glog.Errorln("接受TCP连接异常:", err.Error()) continue } glog.Infoln("TCP连接来自:", conn.RemoteAddr().String(), conn) go ts.handleConn(conn, ts.config) } }() return nil }