123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- package main
- import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "git.qianqiusoft.com/library/glog"
- "git.qianqiusoft.com/qianqiusoft/go-uuid/uuid"
- "net"
- "sync"
- "time"
- )
- const (
- __KEY = "Light#dauth-@*I2"
- CMD_NEW = "new"
- CMD_REMOVE = "remove"
- CMD_PINGPONG = "pg"
- CMD_PINGPONG_RESP = "pg_resp"
- )
- /**
- * 数据包获取接口,跟业务无关
- */
- type Getter interface {
- /**
- * @brief: 包解析器接口
- * @param1 bytess: 当前接收到的数据
- * @param2 conn: 当前conn
- * @return1: 获取出来包内容,多个
- * @return2: 剩余数据
- * @return3: 错误信息
- */
- Get(bytess []byte, tcpConn *TcpConn)([][]byte, []byte, error)
- }
- /**
- * 包解析器
- */
- type Parser interface {
- /**
- * @brief: 包解析器
- * @param1 bytess: 包数据
- * @param2 tcpConn: 当前连接
- */
- Parse(bytess []byte, tcpConn *TcpConn)
- }
- /**
- * 连接回调接口
- */
- type TcpConnCallback interface {
- /**
- * @brief: 新连接回调
- * @param1 conn: 连接
- */
- OnConnected(conn *TcpConn) //
- /**
- * @brief: 连接断开回调
- * @param1 conn: 连接
- * @param2 err: 错误信息,正常关闭的为nil
- */
- OnDisconnected(conn *TcpConn) //
- /**
- * @brief: 错误回调
- * @param1 conn: 连接
- * @param2 err: 错误信息
- */
- OnError(conn *TcpConn, err error)
- }
- /**
- * tcp 配置信息
- */
- type Config struct {
- Ip string // ip
- Port string // 端口
- Network string // 默认tcp, The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
- Interval time.Duration // 心跳间隔
- Timeout time.Duration // 超时时间
- BufferSize int // 接收缓冲区大小
- SendChanSize int // 发送通道大小
- RecvChanSize int // 接收通道大小
- Getter Getter // tcp 连接处理接口
- Parser Parser // 包解析器
- ConnCallback TcpConnCallback // 连接回调接口
- }
- /**
- * tcp 连接
- */
- type TcpConn struct {
- id string // id
- conn net.Conn // 连接
- sendChan chan []byte // 发送数据chan
- recvChan chan []byte // 接收数组chan
- done chan bool // 完成chan
- ctx context.Context // 上下文
- cancel context.CancelFunc // 取消函数
- reqTime time.Time // 最后请求时间
- timer *time.Ticker // 超时检查定时器
- interval time.Duration // 心跳时间间隔
- timeout time.Duration // 超时时间
- bufferSize int // 接收缓冲区大小
- tcpServer *TcpServer // 服务端
- Getter Getter // 包解析器
- Parser Parser // 包解析器
- Tag interface{} // 连接标识
- }
- /**
- * @brief: 发送接口
- * @param1 data: 数据
- */
- func (tc *TcpConn)Send(data []byte){
- if data == nil{
- fmt.Println("发送数据为nil")
- return
- }
- tc.sendChan <- data
- }
- /**
- * @brief: 取消
- */
- func (tc *TcpConn)Cancel(){
- glog.Infoln("tcp conn cancel")
- if tc.timer != nil{
- tc.timer.Stop()
- }
- if tc.cancel != nil{
- tc.cancel()
- }
- tc.conn.Close()
- }
- /**
- * @brief: 启动超时
- */
- func (tc *TcpConn)startTimeoutCheckProcess(){
- go func() {
- if int64(tc.interval) <= 0{
- glog.Infoln("时间间隔小于等于0,不启动超时检测定时器")
- return
- }
- tc.timer = time.NewTicker(tc.interval)
- defer func() {
- tc.timer.Stop()
- glog.Infoln("startTimeoutCheckProcess end")
- }()
- for {
- select {
- case <-tc.ctx.Done():
- glog.Infoln("startTimeoutCheckProcess ctx.cancel")
- return
- case <-tc.timer.C:
- now := time.Now()
- if now.Sub(tc.reqTime) > tc.timeout {
- glog.Errorln(tc.conn, "heartbeat timeout exit", now.Format("2006-01-02 15:04:05"), tc.reqTime.Format("2006-01-02 15:04:05"))
- tc.Cancel()
- return
- }
- }
- }
- }()
- }
- /**
- * @brief: 发送处理流程
- */
- func (tc *TcpConn)startSendProcess() {
- go func() {
- glog.Infoln(tc.conn, "startSendProcess start")
- defer func() {
- tc.done <- true
- glog.Infoln(tc.conn, "startSendProcess end.")
- }()
- for {
- select {
- case <-tc.ctx.Done():
- glog.Infoln("startSendProcess ctx.cancel")
- return
- case data := <-tc.sendChan:
- tc.conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
- if _, err := tc.conn.Write(data); err != nil {
- glog.Errorln(tc.conn, "conn.Write", err.Error())
- return
- }
- }
- }
- }()
- }
- /**
- * @brief: 接收处理流程
- */
- func (tc *TcpConn)startRecvProcess(){
- go func() {
- glog.Infoln(tc.conn, "startRecvProcess start")
- lastRecvDataTime := time.Now()
- defer func() {
- glog.Errorln(tc.conn, "last recv time:", lastRecvDataTime.Format("2006-01-02 15:04:05"), "now time:", time.Now().Format("2006-01-02 15:04:05"))
- tc.done <- true
- glog.Infoln(tc.conn, "startRecvProcess end.")
- }()
- ok, err := checkSign(tc.conn) // 校验
- if err != nil{
- fmt.Println("校验key异常, 关闭连接", err.Error())
- tc.Cancel()
- return
- }
- if !ok{
- fmt.Println("key错误, 关闭连接", err.Error())
- tc.Cancel()
- return
- }
- err = writeString(tc.conn, "ok") // 应答ok
- if err != nil{
- fmt.Println("应答ok错误, 关闭连接", err.Error())
- tc.Cancel()
- return
- }
- // 验证成功后直接同步
- mmap := tc.tcpServer.store.All()
- for _, v := range mmap{
- tc.Send(cmdContentToBytes(CMD_NEW, v))
- }
- for {
- cmd, err := readString(tc.conn)
- if err != nil{
- glog.Errorln(tc.conn, "读取cmd错误:", err.Error())
- break
- }
- if cmd == CMD_NEW{
- err = tc.newHandler()
- }else if cmd == CMD_REMOVE{
- err = tc.removeHandler()
- } else if cmd == CMD_PINGPONG{
- err = tc.pingpongHandler()
- } else{
- glog.Errorln("未知cmd", CMD_NEW)
- continue
- }
- if err != nil{
- glog.Errorln(tc.conn, "处理错误:", cmd, err.Error())
- break
- }
- }
- }()
- }
- // 处理新建
- func (tc *TcpConn)newHandler()error{
- fmt.Println("处理新建")
- _, err := readUInt32(tc.conn) // 读取主体长度,这个忽略掉
- if err != nil{
- glog.Errorln(tc.conn, "读取token错误:", err.Error())
- return err
- }
- tokenStr, err := readString(tc.conn) // d读取token
- if err != nil{
- glog.Errorln(tc.conn, "读取token错误:", err.Error())
- return err
- }
- fmt.Println("处理新建1", tokenStr)
- tokenBytes, err := readBytes(tc.conn)
- if err != nil{
- glog.Errorln(tc.conn, "读取token bytes错误:", err.Error())
- return err
- }
- fmt.Println("处理新建2")
- fmt.Println(tokenBytes)
- tc.tcpServer.store.Set(tokenStr, tokenBytes)
- tc.tcpServer.Broadcast(tc.id, CMD_NEW, tokenBytes)
- return nil
- }
- // 处理移除
- func (tc *TcpConn)removeHandler()error{
- fmt.Println("处理删除")
- tokenStr, err := readString(tc.conn)
- if err != nil{
- glog.Errorln(tc.conn, "读取token错误:", err.Error())
- return err
- }
- tc.tcpServer.store.Remove(tokenStr)
- tc.tcpServer.Broadcast(tc.id, CMD_NEW, []byte(tokenStr))
- return nil
- }
- // 处理心跳
- func (tc *TcpConn)pingpongHandler()error{
- fmt.Println("处理心跳")
- buf := bytes.NewBuffer([]byte{})
- bytess := []byte(CMD_PINGPONG_RESP)
- size := len(bytess)
- buf.Write(uint32ToBytes(uint32(size)))
- buf.Write(bytess)
- tc.Send(buf.Bytes())
- return nil
- }
- type TcpServer struct {
- config *Config // 配置
- connMap sync.Map // 连接列表
- store IStore
- }
- func NewTcpServer(config *Config, store IStore)*TcpServer{
- return &TcpServer{
- config: config,
- store: store,
- }
- }
- /**
- * @brief: 处理新连接
- * @param1 conn: 新连接
- * @param2 config: 配置
- */
- func (ts *TcpServer)handleConn(conn net.Conn, config *Config){
- ctx, cancel := context.WithCancel(context.Background())
- tcpConn := &TcpConn{
- id: uuid.New(),
- conn: conn,
- sendChan: make(chan []byte, config.SendChanSize),
- recvChan: make(chan []byte, config.RecvChanSize),
- done: make(chan bool, 1),
- ctx: ctx,
- cancel: cancel,
- reqTime: time.Now(),
- interval: config.Interval,
- timeout: config.Timeout,
- bufferSize: ts.config.BufferSize,
- tcpServer: ts,
- Getter: config.Getter,
- Parser: config.Parser,
- Tag: nil,
- }
- ts.connMap.Store(tcpConn.id, tcpConn)
- defer func() {
- glog.Infoln(tcpConn.conn, "handleConn end -> conn.Close()")
- err := tcpConn.conn.Close()
- if err != nil {
- glog.Errorln(tcpConn.conn, "conn.Close()", err.Error())
- }
- glog.Infoln(tcpConn.conn, "handleConn end -> cancel()")
- tcpConn.cancel()
- }()
- tcpConn.startSendProcess()
- tcpConn.startRecvProcess()
- if ts.config.ConnCallback != nil{
- // 新连接回调
- ts.config.ConnCallback.OnConnected(tcpConn)
- }
- <-tcpConn.done
- if ts.config.ConnCallback != nil{
- // 关闭回调
- ts.config.ConnCallback.OnDisconnected(tcpConn)
- }
- ts.connMap.Delete(tcpConn.id)
- }
- /**
- * @brief: 广播
- * @param1 exceptConnId: 不包含连接kkey
- * @param2 cmd: 命令
- * @param3 content: 内容
- */
- func (ts *TcpServer)Broadcast(exceptConnId, cmd string, content []byte){
- fmt.Println("广播", exceptConnId, cmd)
- bytess := cmdContentToBytes(cmd, content)
- ts.connMap.Range(func(k, v interface{})bool{
- con := v.(*TcpConn)
- if con.id != exceptConnId{
- con.Send(bytess)
- }
- return true
- })
- }
- /**
- * @brief: 启动服务端
- * @param1 config: tcp配置
- * @return1 错误信息
- */
- func (ts *TcpServer)StartServer()error{
- if ts.config == nil{
- return errors.New("参数config为nil")
- }
- glog.Infoln(ts.config)
- network := ts.config.Network
- if network == ""{
- network = "tcp"
- }
- listen, err := net.Listen(network, ts.config.Ip + ":" + ts.config.Port)
- if err != nil {
- glog.Errorln("监听端口失败:", err.Error())
- return err
- }
- glog.Infoln("启动dauth服务")
- go func() {
- for {
- conn, err := listen.Accept()
- if err != nil {
- glog.Errorln("接受TCP连接异常:", err.Error())
- continue
- }
- glog.Infoln("TCP连接来自:", conn.RemoteAddr().String(), conn)
- go ts.handleConn(conn, ts.config)
- }
- }()
- return nil
- }
|