conn.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "git.qianqiusoft.com/library/glog"
  8. "git.qianqiusoft.com/qianqiusoft/go-uuid/uuid"
  9. "net"
  10. "sync"
  11. "time"
  12. )
  13. const (
  14. __KEY = "Light#dauth-@*I2"
  15. CMD_NEW = "new"
  16. CMD_REMOVE = "remove"
  17. CMD_PINGPONG = "pg"
  18. CMD_PINGPONG_RESP = "pg_resp"
  19. )
  20. /**
  21. * 数据包获取接口,跟业务无关
  22. */
  23. type Getter interface {
  24. /**
  25. * @brief: 包解析器接口
  26. * @param1 bytess: 当前接收到的数据
  27. * @param2 conn: 当前conn
  28. * @return1: 获取出来包内容,多个
  29. * @return2: 剩余数据
  30. * @return3: 错误信息
  31. */
  32. Get(bytess []byte, tcpConn *TcpConn)([][]byte, []byte, error)
  33. }
  34. /**
  35. * 包解析器
  36. */
  37. type Parser interface {
  38. /**
  39. * @brief: 包解析器
  40. * @param1 bytess: 包数据
  41. * @param2 tcpConn: 当前连接
  42. */
  43. Parse(bytess []byte, tcpConn *TcpConn)
  44. }
  45. /**
  46. * 连接回调接口
  47. */
  48. type TcpConnCallback interface {
  49. /**
  50. * @brief: 新连接回调
  51. * @param1 conn: 连接
  52. */
  53. OnConnected(conn *TcpConn) //
  54. /**
  55. * @brief: 连接断开回调
  56. * @param1 conn: 连接
  57. * @param2 err: 错误信息,正常关闭的为nil
  58. */
  59. OnDisconnected(conn *TcpConn) //
  60. /**
  61. * @brief: 错误回调
  62. * @param1 conn: 连接
  63. * @param2 err: 错误信息
  64. */
  65. OnError(conn *TcpConn, err error)
  66. }
  67. /**
  68. * tcp 配置信息
  69. */
  70. type Config struct {
  71. Ip string // ip
  72. Port string // 端口
  73. Network string // 默认tcp, The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
  74. Interval time.Duration // 心跳间隔
  75. Timeout time.Duration // 超时时间
  76. BufferSize int // 接收缓冲区大小
  77. SendChanSize int // 发送通道大小
  78. RecvChanSize int // 接收通道大小
  79. Getter Getter // tcp 连接处理接口
  80. Parser Parser // 包解析器
  81. ConnCallback TcpConnCallback // 连接回调接口
  82. }
  83. /**
  84. * tcp 连接
  85. */
  86. type TcpConn struct {
  87. id string // id
  88. conn net.Conn // 连接
  89. sendChan chan []byte // 发送数据chan
  90. recvChan chan []byte // 接收数组chan
  91. done chan bool // 完成chan
  92. ctx context.Context // 上下文
  93. cancel context.CancelFunc // 取消函数
  94. reqTime time.Time // 最后请求时间
  95. timer *time.Ticker // 超时检查定时器
  96. interval time.Duration // 心跳时间间隔
  97. timeout time.Duration // 超时时间
  98. bufferSize int // 接收缓冲区大小
  99. tcpServer *TcpServer // 服务端
  100. Getter Getter // 包解析器
  101. Parser Parser // 包解析器
  102. Tag interface{} // 连接标识
  103. }
  104. /**
  105. * @brief: 发送接口
  106. * @param1 data: 数据
  107. */
  108. func (tc *TcpConn)Send(data []byte){
  109. if data == nil{
  110. fmt.Println("发送数据为nil")
  111. return
  112. }
  113. tc.sendChan <- data
  114. }
  115. /**
  116. * @brief: 取消
  117. */
  118. func (tc *TcpConn)Cancel(){
  119. glog.Infoln("tcp conn cancel")
  120. if tc.timer != nil{
  121. tc.timer.Stop()
  122. }
  123. if tc.cancel != nil{
  124. tc.cancel()
  125. }
  126. tc.conn.Close()
  127. }
  128. /**
  129. * @brief: 启动超时
  130. */
  131. func (tc *TcpConn)startTimeoutCheckProcess(){
  132. go func() {
  133. if int64(tc.interval) <= 0{
  134. glog.Infoln("时间间隔小于等于0,不启动超时检测定时器")
  135. return
  136. }
  137. tc.timer = time.NewTicker(tc.interval)
  138. defer func() {
  139. tc.timer.Stop()
  140. glog.Infoln("startTimeoutCheckProcess end")
  141. }()
  142. for {
  143. select {
  144. case <-tc.ctx.Done():
  145. glog.Infoln("startTimeoutCheckProcess ctx.cancel")
  146. return
  147. case <-tc.timer.C:
  148. now := time.Now()
  149. if now.Sub(tc.reqTime) > tc.timeout {
  150. glog.Errorln(tc.conn, "heartbeat timeout exit", now.Format("2006-01-02 15:04:05"), tc.reqTime.Format("2006-01-02 15:04:05"))
  151. tc.Cancel()
  152. return
  153. }
  154. }
  155. }
  156. }()
  157. }
  158. /**
  159. * @brief: 发送处理流程
  160. */
  161. func (tc *TcpConn)startSendProcess() {
  162. go func() {
  163. glog.Infoln(tc.conn, "startSendProcess start")
  164. defer func() {
  165. tc.done <- true
  166. glog.Infoln(tc.conn, "startSendProcess end.")
  167. }()
  168. for {
  169. select {
  170. case <-tc.ctx.Done():
  171. glog.Infoln("startSendProcess ctx.cancel")
  172. return
  173. case data := <-tc.sendChan:
  174. tc.conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
  175. if _, err := tc.conn.Write(data); err != nil {
  176. glog.Errorln(tc.conn, "conn.Write", err.Error())
  177. return
  178. }
  179. }
  180. }
  181. }()
  182. }
  183. /**
  184. * @brief: 接收处理流程
  185. */
  186. func (tc *TcpConn)startRecvProcess(){
  187. go func() {
  188. glog.Infoln(tc.conn, "startRecvProcess start")
  189. lastRecvDataTime := time.Now()
  190. defer func() {
  191. glog.Errorln(tc.conn, "last recv time:", lastRecvDataTime.Format("2006-01-02 15:04:05"), "now time:", time.Now().Format("2006-01-02 15:04:05"))
  192. tc.done <- true
  193. glog.Infoln(tc.conn, "startRecvProcess end.")
  194. }()
  195. ok, err := checkSign(tc.conn) // 校验
  196. if err != nil{
  197. fmt.Println("校验key异常, 关闭连接", err.Error())
  198. tc.Cancel()
  199. return
  200. }
  201. if !ok{
  202. fmt.Println("key错误, 关闭连接", err.Error())
  203. tc.Cancel()
  204. return
  205. }
  206. err = writeString(tc.conn, "ok") // 应答ok
  207. if err != nil{
  208. fmt.Println("应答ok错误, 关闭连接", err.Error())
  209. tc.Cancel()
  210. return
  211. }
  212. // 验证成功后直接同步
  213. mmap := tc.tcpServer.store.All()
  214. for _, v := range mmap{
  215. tc.Send(cmdContentToBytes(CMD_NEW, v))
  216. }
  217. for {
  218. cmd, err := readString(tc.conn)
  219. if err != nil{
  220. glog.Errorln(tc.conn, "读取cmd错误:", err.Error())
  221. break
  222. }
  223. if cmd == CMD_NEW{
  224. err = tc.newHandler()
  225. }else if cmd == CMD_REMOVE{
  226. err = tc.removeHandler()
  227. } else if cmd == CMD_PINGPONG{
  228. err = tc.pingpongHandler()
  229. } else{
  230. glog.Errorln("未知cmd", CMD_NEW)
  231. continue
  232. }
  233. if err != nil{
  234. glog.Errorln(tc.conn, "处理错误:", cmd, err.Error())
  235. break
  236. }
  237. }
  238. }()
  239. }
  240. // 处理新建
  241. func (tc *TcpConn)newHandler()error{
  242. fmt.Println("处理新建")
  243. _, err := readUInt32(tc.conn) // 读取主体长度,这个忽略掉
  244. if err != nil{
  245. glog.Errorln(tc.conn, "读取token错误:", err.Error())
  246. return err
  247. }
  248. tokenStr, err := readString(tc.conn) // d读取token
  249. if err != nil{
  250. glog.Errorln(tc.conn, "读取token错误:", err.Error())
  251. return err
  252. }
  253. fmt.Println("处理新建1", tokenStr)
  254. tokenBytes, err := readBytes(tc.conn)
  255. if err != nil{
  256. glog.Errorln(tc.conn, "读取token bytes错误:", err.Error())
  257. return err
  258. }
  259. fmt.Println("处理新建2")
  260. fmt.Println(tokenBytes)
  261. tc.tcpServer.store.Set(tokenStr, tokenBytes)
  262. tc.tcpServer.Broadcast(tc.id, CMD_NEW, tokenBytes)
  263. return nil
  264. }
  265. // 处理移除
  266. func (tc *TcpConn)removeHandler()error{
  267. fmt.Println("处理删除")
  268. tokenStr, err := readString(tc.conn)
  269. if err != nil{
  270. glog.Errorln(tc.conn, "读取token错误:", err.Error())
  271. return err
  272. }
  273. tc.tcpServer.store.Remove(tokenStr)
  274. tc.tcpServer.Broadcast(tc.id, CMD_NEW, []byte(tokenStr))
  275. return nil
  276. }
  277. // 处理心跳
  278. func (tc *TcpConn)pingpongHandler()error{
  279. fmt.Println("处理心跳")
  280. buf := bytes.NewBuffer([]byte{})
  281. bytess := []byte(CMD_PINGPONG_RESP)
  282. size := len(bytess)
  283. buf.Write(uint32ToBytes(uint32(size)))
  284. buf.Write(bytess)
  285. tc.Send(buf.Bytes())
  286. return nil
  287. }
  288. type TcpServer struct {
  289. config *Config // 配置
  290. connMap sync.Map // 连接列表
  291. store IStore
  292. }
  293. func NewTcpServer(config *Config, store IStore)*TcpServer{
  294. return &TcpServer{
  295. config: config,
  296. store: store,
  297. }
  298. }
  299. /**
  300. * @brief: 处理新连接
  301. * @param1 conn: 新连接
  302. * @param2 config: 配置
  303. */
  304. func (ts *TcpServer)handleConn(conn net.Conn, config *Config){
  305. ctx, cancel := context.WithCancel(context.Background())
  306. tcpConn := &TcpConn{
  307. id: uuid.New(),
  308. conn: conn,
  309. sendChan: make(chan []byte, config.SendChanSize),
  310. recvChan: make(chan []byte, config.RecvChanSize),
  311. done: make(chan bool, 1),
  312. ctx: ctx,
  313. cancel: cancel,
  314. reqTime: time.Now(),
  315. interval: config.Interval,
  316. timeout: config.Timeout,
  317. bufferSize: ts.config.BufferSize,
  318. tcpServer: ts,
  319. Getter: config.Getter,
  320. Parser: config.Parser,
  321. Tag: nil,
  322. }
  323. ts.connMap.Store(tcpConn.id, tcpConn)
  324. defer func() {
  325. glog.Infoln(tcpConn.conn, "handleConn end -> conn.Close()")
  326. err := tcpConn.conn.Close()
  327. if err != nil {
  328. glog.Errorln(tcpConn.conn, "conn.Close()", err.Error())
  329. }
  330. glog.Infoln(tcpConn.conn, "handleConn end -> cancel()")
  331. tcpConn.cancel()
  332. }()
  333. tcpConn.startSendProcess()
  334. tcpConn.startRecvProcess()
  335. if ts.config.ConnCallback != nil{
  336. // 新连接回调
  337. ts.config.ConnCallback.OnConnected(tcpConn)
  338. }
  339. <-tcpConn.done
  340. if ts.config.ConnCallback != nil{
  341. // 关闭回调
  342. ts.config.ConnCallback.OnDisconnected(tcpConn)
  343. }
  344. ts.connMap.Delete(tcpConn.id)
  345. }
  346. /**
  347. * @brief: 广播
  348. * @param1 exceptConnId: 不包含连接kkey
  349. * @param2 cmd: 命令
  350. * @param3 content: 内容
  351. */
  352. func (ts *TcpServer)Broadcast(exceptConnId, cmd string, content []byte){
  353. fmt.Println("广播", exceptConnId, cmd)
  354. bytess := cmdContentToBytes(cmd, content)
  355. ts.connMap.Range(func(k, v interface{})bool{
  356. con := v.(*TcpConn)
  357. if con.id != exceptConnId{
  358. con.Send(bytess)
  359. }
  360. return true
  361. })
  362. }
  363. /**
  364. * @brief: 启动服务端
  365. * @param1 config: tcp配置
  366. * @return1 错误信息
  367. */
  368. func (ts *TcpServer)StartServer()error{
  369. if ts.config == nil{
  370. return errors.New("参数config为nil")
  371. }
  372. glog.Infoln(ts.config)
  373. network := ts.config.Network
  374. if network == ""{
  375. network = "tcp"
  376. }
  377. listen, err := net.Listen(network, ts.config.Ip + ":" + ts.config.Port)
  378. if err != nil {
  379. glog.Errorln("监听端口失败:", err.Error())
  380. return err
  381. }
  382. glog.Infoln("启动dauth服务")
  383. go func() {
  384. for {
  385. conn, err := listen.Accept()
  386. if err != nil {
  387. glog.Errorln("接受TCP连接异常:", err.Error())
  388. continue
  389. }
  390. glog.Infoln("TCP连接来自:", conn.RemoteAddr().String(), conn)
  391. go ts.handleConn(conn, ts.config)
  392. }
  393. }()
  394. return nil
  395. }