|
|
@@ -103,21 +103,41 @@ func (s *Server) collectQueryStringData() http.Handler {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-func (s *Server) withAccessLog(next http.Handler) http.Handler {
|
|
|
- type accessLogEntry struct {
|
|
|
- Method string `json:"method"`
|
|
|
- Host string `json:"host"`
|
|
|
- Path string `json:"path"`
|
|
|
- IP string `json:"ip"`
|
|
|
- ResponseTime float64 `json:"response_time"`
|
|
|
+type accessLogEntry struct {
|
|
|
+ Method string `json:"method"`
|
|
|
+ Host string `json:"host"`
|
|
|
+ Path string `json:"path"`
|
|
|
+ IP string `json:"ip"`
|
|
|
+ ResponseTime float64 `json:"response_time"`
|
|
|
+
|
|
|
+ encoded []byte
|
|
|
+ err error
|
|
|
+}
|
|
|
+
|
|
|
+func (ale *accessLogEntry) ensureEncoded() {
|
|
|
+ if ale.encoded == nil && ale.err == nil {
|
|
|
+ ale.encoded, ale.err = json.Marshal(ale)
|
|
|
}
|
|
|
+}
|
|
|
+
|
|
|
+func (ale *accessLogEntry) Length() int {
|
|
|
+ ale.ensureEncoded()
|
|
|
+ return len(ale.encoded)
|
|
|
+}
|
|
|
+
|
|
|
+func (ale *accessLogEntry) Encode() ([]byte, error) {
|
|
|
+ ale.ensureEncoded()
|
|
|
+ return ale.encoded, ale.err
|
|
|
+}
|
|
|
+
|
|
|
+func (s *Server) withAccessLog(next http.Handler) http.Handler {
|
|
|
|
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
started := time.Now()
|
|
|
|
|
|
next.ServeHTTP(w, r)
|
|
|
|
|
|
- entry := accessLogEntry{
|
|
|
+ entry := &accessLogEntry{
|
|
|
Method: r.Method,
|
|
|
Host: r.Host,
|
|
|
Path: r.RequestURI,
|
|
|
@@ -125,18 +145,13 @@ func (s *Server) withAccessLog(next http.Handler) http.Handler {
|
|
|
ResponseTime: float64(time.Since(started)) / float64(time.Second),
|
|
|
}
|
|
|
|
|
|
- jsonEntry, err := json.Marshal(entry)
|
|
|
- if err != nil {
|
|
|
- log.Println("Failed to marshal JSON access log entry:", err)
|
|
|
- }
|
|
|
-
|
|
|
// We will use the client's IP address as key. This will cause
|
|
|
// all the access log entries of the same IP address to end up
|
|
|
// on the same partition.
|
|
|
s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
|
|
|
Topic: "access_log",
|
|
|
Key: sarama.StringEncoder(r.RemoteAddr),
|
|
|
- Value: sarama.ByteEncoder(jsonEntry),
|
|
|
+ Value: entry,
|
|
|
}
|
|
|
})
|
|
|
}
|
|
|
@@ -177,7 +192,7 @@ func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
|
|
|
log.Fatalln("Failed to start Sarama producer:", err)
|
|
|
}
|
|
|
|
|
|
- // We wil just log to STDOUT if we're not able to produce messages.
|
|
|
+ // We will just log to STDOUT if we're not able to produce messages.
|
|
|
// Note: messages will only be returned here after all retry attempts are exhausted.
|
|
|
go func() {
|
|
|
for err := range producer.Errors() {
|