package main import ( "github.com/Shopify/sarama" "crypto/tls" "crypto/x509" "encoding/json" "flag" "fmt" "io/ioutil" "log" "net/http" "os" "strings" "time" ) var ( addr = flag.String("addr", ":8080", "The address to bind to") brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list") verbose = flag.Bool("verbose", false, "Turn on Sarama logging") certFile = flag.String("certificate", "", "The optional certificate file for client authentication") keyFile = flag.String("key", "", "The optional key file for client authentication") caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication") verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain") ) func main() { flag.Parse() if *verbose { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } if *brokers == "" { flag.PrintDefaults() os.Exit(1) } brokerList := strings.Split(*brokers, ",") log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", ")) server := &Server{ DataCollector: newDataCollector(brokerList), AccessLogProducer: newAccessLogProducer(brokerList), } defer func() { if err := server.Close(); err != nil { log.Println("Failed to close server", err) } }() log.Fatal(server.Run(*addr)) } func createTlsConfiguration() (t *tls.Config) { if *certFile != "" && *keyFile != "" && *caFile != "" { cert, err := tls.LoadX509KeyPair(*certFile, *keyFile) if err != nil { log.Fatal(err) } caCert, err := ioutil.ReadFile(*caFile) if err != nil { log.Fatal(err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) t = &tls.Config{ Certificates: []tls.Certificate{cert}, RootCAs: caCertPool, InsecureSkipVerify: *verifySsl, } } // will be nil by default if nothing is provided return t } type Server struct { DataCollector sarama.SyncProducer AccessLogProducer sarama.AsyncProducer } func (s *Server) Close() error { if err := s.DataCollector.Close(); err != nil { log.Println("Failed to shut down data collector cleanly", err) } if err := s.AccessLogProducer.Close(); err != nil { log.Println("Failed to shut down access log producer cleanly", err) } return nil } func (s *Server) Handler() http.Handler { return s.withAccessLog(s.collectQueryStringData()) } func (s *Server) Run(addr string) error { httpServer := &http.Server{ Addr: addr, Handler: s.Handler(), } log.Printf("Listening for requests on %s...\n", addr) return httpServer.ListenAndServe() } func (s *Server) collectQueryStringData() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { http.NotFound(w, r) return } // We are not setting a message key, which means that all messages will // be distributed randomly over the different partitions. partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{ Topic: "important", Value: sarama.StringEncoder(r.URL.RawQuery), }) if err != nil { w.WriteHeader(http.StatusInternalServerError) fmt.Fprintf(w, "Failed to store your data:, %s", err) } else { // The tuple (topic, partition, offset) can be used as a unique identifier // for a message in a Kafka cluster. fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset) } }) } type accessLogEntry struct { Method string `json:"method"` Host string `json:"host"` Path string `json:"path"` IP string `json:"ip"` ResponseTime float64 `json:"response_time"` encoded []byte err error } func (ale *accessLogEntry) ensureEncoded() { if ale.encoded == nil && ale.err == nil { ale.encoded, ale.err = json.Marshal(ale) } } func (ale *accessLogEntry) Length() int { ale.ensureEncoded() return len(ale.encoded) } func (ale *accessLogEntry) Encode() ([]byte, error) { ale.ensureEncoded() return ale.encoded, ale.err } func (s *Server) withAccessLog(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { started := time.Now() next.ServeHTTP(w, r) entry := &accessLogEntry{ Method: r.Method, Host: r.Host, Path: r.RequestURI, IP: r.RemoteAddr, ResponseTime: float64(time.Since(started)) / float64(time.Second), } // We will use the client's IP address as key. This will cause // all the access log entries of the same IP address to end up // on the same partition. s.AccessLogProducer.Input() <- &sarama.ProducerMessage{ Topic: "access_log", Key: sarama.StringEncoder(r.RemoteAddr), Value: entry, } }) } func newDataCollector(brokerList []string) sarama.SyncProducer { // For the data collector, we are looking for strong consistency semantics. // Because we don't change the flush settings, sarama will try to produce messages // as fast as possible to keep latency low. config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message config.Producer.Return.Successes = true tlsConfig := createTlsConfiguration() if tlsConfig != nil { config.Net.TLS.Config = tlsConfig config.Net.TLS.Enable = true } // On the broker side, you may want to change the following settings to get // stronger consistency guarantees: // - For your broker, set `unclean.leader.election.enable` to false // - For the topic, you could increase `min.insync.replicas`. producer, err := sarama.NewSyncProducer(brokerList, config) if err != nil { log.Fatalln("Failed to start Sarama producer:", err) } return producer } func newAccessLogProducer(brokerList []string) sarama.AsyncProducer { // For the access log, we are looking for AP semantics, with high throughput. // By creating batches of compressed messages, we reduce network I/O at a cost of more latency. config := sarama.NewConfig() tlsConfig := createTlsConfiguration() if tlsConfig != nil { config.Net.TLS.Enable = true config.Net.TLS.Config = tlsConfig } config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack config.Producer.Compression = sarama.CompressionSnappy // Compress messages config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms producer, err := sarama.NewAsyncProducer(brokerList, config) if err != nil { log.Fatalln("Failed to start Sarama producer:", err) } // We will just log to STDOUT if we're not able to produce messages. // Note: messages will only be returned here after all retry attempts are exhausted. go func() { for err := range producer.Errors() { log.Println("Failed to write access log entry:", err) } }() return producer }