http_server.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. package main
  2. import (
  3. "github.com/Shopify/sarama"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "os"
  13. "strings"
  14. "time"
  15. )
  16. var (
  17. addr = flag.String("addr", ":8080", "The address to bind to")
  18. brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
  19. verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
  20. cert_file = flag.String("certificate", "", "The optional certificate file for client authentication")
  21. key_file = flag.String("key", "", "The optional key file for client authentication")
  22. ca_file = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
  23. )
  24. func main() {
  25. flag.Parse()
  26. if *verbose {
  27. sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  28. }
  29. if *brokers == "" {
  30. flag.PrintDefaults()
  31. os.Exit(1)
  32. }
  33. brokerList := strings.Split(*brokers, ",")
  34. log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
  35. server := &Server{
  36. DataCollector: newDataCollector(brokerList),
  37. AccessLogProducer: newAccessLogProducer(brokerList),
  38. }
  39. defer func() {
  40. if err := server.Close(); err != nil {
  41. log.Println("Failed to close server", err)
  42. }
  43. }()
  44. log.Fatal(server.Run(*addr))
  45. }
  46. func createTlsConfiguration() (t *tls.Config) {
  47. if *cert_file != "" && *key_file != "" && *ca_file != "" {
  48. cert, err := tls.LoadX509KeyPair(*cert_file, *key_file)
  49. if err != nil {
  50. log.Fatal(err)
  51. }
  52. caCert, err := ioutil.ReadFile(*ca_file)
  53. if err != nil {
  54. log.Fatal(err)
  55. }
  56. caCertPool := x509.NewCertPool()
  57. caCertPool.AppendCertsFromPEM(caCert)
  58. t = &tls.Config{
  59. Certificates: []tls.Certificate{cert},
  60. RootCAs: caCertPool,
  61. InsecureSkipVerify: true,
  62. }
  63. }
  64. // will be nil by default if nothing is provided
  65. return t
  66. }
  67. type Server struct {
  68. DataCollector sarama.SyncProducer
  69. AccessLogProducer sarama.AsyncProducer
  70. }
  71. func (s *Server) Close() error {
  72. if err := s.DataCollector.Close(); err != nil {
  73. log.Println("Failed to shut down data collector cleanly", err)
  74. }
  75. if err := s.AccessLogProducer.Close(); err != nil {
  76. log.Println("Failed to shut down access log producer cleanly", err)
  77. }
  78. return nil
  79. }
  80. func (s *Server) Handler() http.Handler {
  81. return s.withAccessLog(s.collectQueryStringData())
  82. }
  83. func (s *Server) Run(addr string) error {
  84. httpServer := &http.Server{
  85. Addr: addr,
  86. Handler: s.Handler(),
  87. }
  88. log.Printf("Listening for requests on %s...\n", addr)
  89. return httpServer.ListenAndServe()
  90. }
  91. func (s *Server) collectQueryStringData() http.Handler {
  92. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  93. if r.URL.Path != "/" {
  94. http.NotFound(w, r)
  95. return
  96. }
  97. // We are not setting a message key, which means that all messages will
  98. // be distributed randomly over the different partitions.
  99. partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
  100. Topic: "important",
  101. Value: sarama.StringEncoder(r.URL.RawQuery),
  102. })
  103. if err != nil {
  104. w.WriteHeader(http.StatusInternalServerError)
  105. fmt.Fprintf(w, "Failed to store your data:, %s", err)
  106. } else {
  107. // The tuple (topic, partition, offset) can be used as a unique identifier
  108. // for a message in a Kafka cluster.
  109. fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
  110. }
  111. })
  112. }
  113. type accessLogEntry struct {
  114. Method string `json:"method"`
  115. Host string `json:"host"`
  116. Path string `json:"path"`
  117. IP string `json:"ip"`
  118. ResponseTime float64 `json:"response_time"`
  119. encoded []byte
  120. err error
  121. }
  122. func (ale *accessLogEntry) ensureEncoded() {
  123. if ale.encoded == nil && ale.err == nil {
  124. ale.encoded, ale.err = json.Marshal(ale)
  125. }
  126. }
  127. func (ale *accessLogEntry) Length() int {
  128. ale.ensureEncoded()
  129. return len(ale.encoded)
  130. }
  131. func (ale *accessLogEntry) Encode() ([]byte, error) {
  132. ale.ensureEncoded()
  133. return ale.encoded, ale.err
  134. }
  135. func (s *Server) withAccessLog(next http.Handler) http.Handler {
  136. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  137. started := time.Now()
  138. next.ServeHTTP(w, r)
  139. entry := &accessLogEntry{
  140. Method: r.Method,
  141. Host: r.Host,
  142. Path: r.RequestURI,
  143. IP: r.RemoteAddr,
  144. ResponseTime: float64(time.Since(started)) / float64(time.Second),
  145. }
  146. // We will use the client's IP address as key. This will cause
  147. // all the access log entries of the same IP address to end up
  148. // on the same partition.
  149. s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
  150. Topic: "access_log",
  151. Key: sarama.StringEncoder(r.RemoteAddr),
  152. Value: entry,
  153. }
  154. })
  155. }
  156. func newDataCollector(brokerList []string) sarama.SyncProducer {
  157. // For the data collector, we are looking for strong consistency semantics.
  158. // Because we don't change the flush settings, sarama will try to produce messages
  159. // as fast as possible to keep latency low.
  160. config := sarama.NewConfig()
  161. config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
  162. config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
  163. tls_config := createTlsConfiguration()
  164. if tls_config != nil {
  165. config.Net.TLS.Config = tls_config
  166. config.Net.TLS.Enable = true
  167. }
  168. // On the broker side, you may want to change the following settings to get
  169. // stronger consistency guarantees:
  170. // - For your broker, set `unclean.leader.election.enable` to false
  171. // - For the topic, you could increase `min.insync.replicas`.
  172. producer, err := sarama.NewSyncProducer(brokerList, config)
  173. if err != nil {
  174. log.Fatalln("Failed to start Sarama producer:", err)
  175. }
  176. return producer
  177. }
  178. func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
  179. // For the access log, we are looking for AP semantics, with high throughput.
  180. // By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
  181. config := sarama.NewConfig()
  182. tls_config := createTlsConfiguration()
  183. if tls_config != nil {
  184. config.Net.TLS.Enable = true
  185. config.Net.TLS.Config = tls_config
  186. }
  187. config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
  188. config.Producer.Compression = sarama.CompressionSnappy // Compress messages
  189. config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
  190. producer, err := sarama.NewAsyncProducer(brokerList, config)
  191. if err != nil {
  192. log.Fatalln("Failed to start Sarama producer:", err)
  193. }
  194. // We will just log to STDOUT if we're not able to produce messages.
  195. // Note: messages will only be returned here after all retry attempts are exhausted.
  196. go func() {
  197. for err := range producer.Errors() {
  198. log.Println("Failed to write access log entry:", err)
  199. }
  200. }()
  201. return producer
  202. }