logger_cassandra.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package middleware
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gin-gonic/gin"
  7. "github.com/gocql/gocql"
  8. "time"
  9. )
  10. var __logInfoChan chan *logInfo
  11. // cassandra 配置
  12. var _clusterCfg *gocql.ClusterConfig = nil
  13. func init(){
  14. defer func() {
  15. if p := recover(); p != nil {
  16. fmt.Println("ecover", p)
  17. }
  18. }()
  19. __logInfoChan = make(chan *logInfo, 1000)
  20. // 设置cassandar 配置
  21. _clusterCfg = gocql.NewCluster("218.14.81.38")
  22. _clusterCfg.Keyspace = "i2_log"
  23. _clusterCfg.Consistency = gocql.Quorum
  24. //设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
  25. _clusterCfg.NumConns = 3
  26. session, err := _clusterCfg.CreateSession()
  27. if err != nil{
  28. fmt.Println("create session err", err.Error())
  29. return
  30. }
  31. time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
  32. defer session.Close()
  33. // 日志处理
  34. logProcess()
  35. }
  36. /**
  37. * 自定义日志响应类,主要是把resp的响应内容写入到body中
  38. */
  39. type loggerRespWriter struct{
  40. gin.ResponseWriter
  41. respBody *bytes.Buffer
  42. }
  43. // 写字节数组
  44. func (w loggerRespWriter) Write(b []byte) (int, error) {
  45. w.respBody.Write(b) // 保存
  46. return w.ResponseWriter.Write(b)
  47. }
  48. // 写字符串
  49. func (w loggerRespWriter) WriteString(s string) (int, error) {
  50. w.respBody.WriteString(s) // 保存
  51. return w.ResponseWriter.WriteString(s)
  52. }
  53. /**
  54. * 日志信息
  55. */
  56. type logInfo struct {
  57. ReqClientIp string
  58. ReqTime time.Time
  59. ReqMethod string
  60. ReqUrl string
  61. ReqProto string
  62. ReqUa string
  63. ReqReferer string
  64. ReqPostData string
  65. RespTime time.Time
  66. RespBody string
  67. RespCode string // 由 resp body 解析
  68. RespMsg string // 由 resp body 解析
  69. RespData string // 由 resp body 解析
  70. CostTime int
  71. }
  72. func LoggerCassandra()gin.HandlerFunc{
  73. return func(c *gin.Context){
  74. respWriter := &loggerRespWriter{
  75. ResponseWriter: c.Writer,
  76. respBody: bytes.NewBuffer([]byte{}),
  77. }
  78. c.Writer = respWriter // 注入自定writer
  79. logInfo := &logInfo{}
  80. logInfo.ReqTime = time.Now()
  81. // 下一个请求
  82. c.Next()
  83. // 设置对象
  84. logInfo.RespBody = respWriter.respBody.String()
  85. logInfo.RespTime = time.Now()
  86. logInfo.ReqMethod = c.Request.Method
  87. logInfo.ReqUrl = c.Request.RequestURI
  88. logInfo.ReqProto = c.Request.Proto
  89. logInfo.ReqUa = c.Request.UserAgent()
  90. logInfo.ReqReferer = c.Request.Referer()
  91. logInfo.ReqPostData = c.Request.PostForm.Encode()
  92. logInfo.ReqClientIp = c.ClientIP()
  93. __logInfoChan <- logInfo
  94. }
  95. }
  96. func logProcess(){
  97. go func(){
  98. for{
  99. select {
  100. case logInfo := <- __logInfoChan:
  101. insertLogInfo(logInfo)
  102. }
  103. }
  104. }()
  105. }
  106. func insertLogInfo(logInfo *logInfo){
  107. session, err := _clusterCfg.CreateSession()
  108. if err != nil{
  109. fmt.Println("创建cassandra session错误", err.Error())
  110. return
  111. }
  112. defer session.Close()
  113. var respBodyObj struct {
  114. Code string
  115. Msg string
  116. Data string
  117. }
  118. if logInfo.RespBody != ""{
  119. err = json.Unmarshal([]byte(logInfo.RespBody), &respBodyObj)
  120. if err != nil{
  121. fmt.Println("json.Unmarshal 错误", err.Error())
  122. }else {
  123. logInfo.RespCode = respBodyObj.Code
  124. logInfo.RespMsg = respBodyObj.Msg
  125. logInfo.RespData = respBodyObj.Data
  126. }
  127. }
  128. logInfo.CostTime = int(logInfo.RespTime.Sub(logInfo.ReqTime))
  129. // 保存轨迹
  130. fmt.Println("保存轨迹")
  131. cqlformat := `insert into
  132. log_info(req_date, req_time, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time)
  133. values(?,?,?,?,?,?,?,?,?,?,?,?,?,?);`
  134. reqDate := logInfo.ReqTime.Format("20060102")
  135. q := session.Query(cqlformat, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime.Format("2006-01-02 15:04:05"), logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
  136. err = q.Exec() // 应该可以使用批量插入
  137. if err != nil{
  138. fmt.Println("插入日志错误:", err)
  139. return
  140. }
  141. }