http_server.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package main
  2. import (
  3. "github.com/Shopify/sarama"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "os"
  10. "strings"
  11. "time"
  12. )
  13. var (
  14. addr = flag.String("addr", ":8080", "The address to bind to")
  15. brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
  16. verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
  17. )
  18. func main() {
  19. flag.Parse()
  20. if *verbose {
  21. sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  22. }
  23. if *brokers == "" {
  24. flag.PrintDefaults()
  25. os.Exit(1)
  26. }
  27. brokerList := strings.Split(*brokers, ",")
  28. log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
  29. server := &Server{
  30. DataCollector: newDataCollector(brokerList),
  31. AccessLogProducer: newAccessLogProducer(brokerList),
  32. }
  33. defer func() {
  34. if err := server.Close(); err != nil {
  35. log.Println("Failed to close server", err)
  36. }
  37. }()
  38. log.Fatal(server.Run(*addr))
  39. }
  40. type Server struct {
  41. DataCollector sarama.SyncProducer
  42. AccessLogProducer sarama.AsyncProducer
  43. }
  44. func (s *Server) Close() error {
  45. if err := s.DataCollector.Close(); err != nil {
  46. log.Println("Failed to shut down data collector cleanly", err)
  47. }
  48. if err := s.AccessLogProducer.Close(); err != nil {
  49. log.Println("Failed to shut down access log producer cleanly", err)
  50. }
  51. return nil
  52. }
  53. func (s *Server) Handler() http.Handler {
  54. return s.withAccessLog(s.collectQueryStringData())
  55. }
  56. func (s *Server) Run(addr string) error {
  57. httpServer := &http.Server{
  58. Addr: addr,
  59. Handler: s.Handler(),
  60. }
  61. log.Printf("Listening for requests on %s...\n", addr)
  62. return httpServer.ListenAndServe()
  63. }
  64. func (s *Server) collectQueryStringData() http.Handler {
  65. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  66. if r.URL.Path != "/" {
  67. http.NotFound(w, r)
  68. return
  69. }
  70. // We are not setting a message key, which means that all messages will
  71. // be distributed randomly over the different partitions.
  72. partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
  73. Topic: "important",
  74. Value: sarama.StringEncoder(r.URL.RawQuery),
  75. })
  76. if err != nil {
  77. w.WriteHeader(http.StatusInternalServerError)
  78. fmt.Fprintf(w, "Failed to store your data:, %s", err)
  79. } else {
  80. // The tuple (topic, partition, offset) can be used as a unique identifier
  81. // for a message in a Kafka cluster.
  82. fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
  83. }
  84. })
  85. }
  86. type accessLogEntry struct {
  87. Method string `json:"method"`
  88. Host string `json:"host"`
  89. Path string `json:"path"`
  90. IP string `json:"ip"`
  91. ResponseTime float64 `json:"response_time"`
  92. encoded []byte
  93. err error
  94. }
  95. func (ale *accessLogEntry) ensureEncoded() {
  96. if ale.encoded == nil && ale.err == nil {
  97. ale.encoded, ale.err = json.Marshal(ale)
  98. }
  99. }
  100. func (ale *accessLogEntry) Length() int {
  101. ale.ensureEncoded()
  102. return len(ale.encoded)
  103. }
  104. func (ale *accessLogEntry) Encode() ([]byte, error) {
  105. ale.ensureEncoded()
  106. return ale.encoded, ale.err
  107. }
  108. func (s *Server) withAccessLog(next http.Handler) http.Handler {
  109. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  110. started := time.Now()
  111. next.ServeHTTP(w, r)
  112. entry := &accessLogEntry{
  113. Method: r.Method,
  114. Host: r.Host,
  115. Path: r.RequestURI,
  116. IP: r.RemoteAddr,
  117. ResponseTime: float64(time.Since(started)) / float64(time.Second),
  118. }
  119. // We will use the client's IP address as key. This will cause
  120. // all the access log entries of the same IP address to end up
  121. // on the same partition.
  122. s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
  123. Topic: "access_log",
  124. Key: sarama.StringEncoder(r.RemoteAddr),
  125. Value: entry,
  126. }
  127. })
  128. }
  129. func newDataCollector(brokerList []string) sarama.SyncProducer {
  130. // For the data collector, we are looking for strong consistency semantics.
  131. // Because we don't change the flush settings, sarama will try to produce messages
  132. // as fast as possible to keep latency low.
  133. config := sarama.NewConfig()
  134. config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
  135. config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
  136. // On the broker side, you may want to change the following settings to get
  137. // stronger consistency guarantees:
  138. // - For your broker, set `unclean.leader.election.enable` to false
  139. // - For the topic, you could increase `min.insync.replicas`.
  140. producer, err := sarama.NewSyncProducer(brokerList, config)
  141. if err != nil {
  142. log.Fatalln("Failed to start Sarama producer:", err)
  143. }
  144. return producer
  145. }
  146. func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
  147. // For the access log, we are looking for AP semantics, with high throughput.
  148. // By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
  149. config := sarama.NewConfig()
  150. config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
  151. config.Producer.Compression = sarama.CompressionSnappy // Compress messages
  152. config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
  153. producer, err := sarama.NewAsyncProducer(brokerList, config)
  154. if err != nil {
  155. log.Fatalln("Failed to start Sarama producer:", err)
  156. }
  157. // We will just log to STDOUT if we're not able to produce messages.
  158. // Note: messages will only be returned here after all retry attempts are exhausted.
  159. go func() {
  160. for err := range producer.Errors() {
  161. log.Println("Failed to write access log entry:", err)
  162. }
  163. }()
  164. return producer
  165. }