Browse Source

Initial pass at a basic SerDe implementation

Lourens Naudé 5 years ago
parent
commit
ab2cd98eff
7 changed files with 132 additions and 12 deletions
  1. 12 2
      async_producer.go
  2. 11 2
      config.go
  3. 28 8
      consumer.go
  4. 20 0
      deserializer.go
  5. 6 0
      errors.go
  6. 35 0
      serde.go
  7. 20 0
      serializer.go

+ 12 - 2
async_producer.go

@@ -107,8 +107,9 @@ func newTransactionManager(conf *Config, client Client) (*transactionManager, er
 }
 }
 
 
 type asyncProducer struct {
 type asyncProducer struct {
-	client Client
-	conf   *Config
+	client     Client
+	conf       *Config
+	serializer Serializer
 
 
 	errors                    chan *ProducerError
 	errors                    chan *ProducerError
 	input, successes, retries chan *ProducerMessage
 	input, successes, retries chan *ProducerMessage
@@ -162,6 +163,8 @@ func newAsyncProducer(client Client) (AsyncProducer, error) {
 		txnmgr:     txnmgr,
 		txnmgr:     txnmgr,
 	}
 	}
 
 
+	p.serializer = p.conf.Producer.Serde.Serializer()
+
 	// launch our singleton dispatchers
 	// launch our singleton dispatchers
 	go withRecover(p.dispatcher)
 	go withRecover(p.dispatcher)
 	go withRecover(p.retryHandler)
 	go withRecover(p.retryHandler)
@@ -355,6 +358,13 @@ func (p *asyncProducer) dispatcher() {
 			p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
 			p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
 			continue
 			continue
 		}
 		}
+
+		msg, err := p.serializer.Serialize(msg)
+		if err != nil {
+			p.returnError(msg, ErrSerializationError)
+			continue
+		}
+
 		if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
 		if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
 			p.returnError(msg, ErrMessageSizeTooLarge)
 			p.returnError(msg, ErrMessageSizeTooLarge)
 			continue
 			continue

+ 11 - 2
config.go

@@ -153,6 +153,8 @@ type Config struct {
 	// Producer is the namespace for configuration related to producing messages,
 	// Producer is the namespace for configuration related to producing messages,
 	// used by the Producer.
 	// used by the Producer.
 	Producer struct {
 	Producer struct {
+		Serde Serde
+
 		// The maximum permitted size of a message (defaults to 1000000). Should be
 		// The maximum permitted size of a message (defaults to 1000000). Should be
 		// set equal to or smaller than the broker's `message.max.bytes`.
 		// set equal to or smaller than the broker's `message.max.bytes`.
 		MaxMessageBytes int
 		MaxMessageBytes int
@@ -234,6 +236,7 @@ type Config struct {
 	// Consumer is the namespace for configuration related to consuming messages,
 	// Consumer is the namespace for configuration related to consuming messages,
 	// used by the Consumer.
 	// used by the Consumer.
 	Consumer struct {
 	Consumer struct {
+		Serde Serde
 
 
 		// Group is the namespace for configuring consumer group.
 		// Group is the namespace for configuring consumer group.
 		Group struct {
 		Group struct {
@@ -449,7 +452,10 @@ func NewConfig() *Config {
 	c.Producer.Retry.Backoff = 100 * time.Millisecond
 	c.Producer.Retry.Backoff = 100 * time.Millisecond
 	c.Producer.Return.Errors = true
 	c.Producer.Return.Errors = true
 	c.Producer.CompressionLevel = CompressionLevelDefault
 	c.Producer.CompressionLevel = CompressionLevelDefault
-
+	serde, err := NewSerde(c)
+	if err == nil {
+		c.Producer.Serde = serde
+	}
 	c.Consumer.Fetch.Min = 1
 	c.Consumer.Fetch.Min = 1
 	c.Consumer.Fetch.Default = 1024 * 1024
 	c.Consumer.Fetch.Default = 1024 * 1024
 	c.Consumer.Retry.Backoff = 2 * time.Second
 	c.Consumer.Retry.Backoff = 2 * time.Second
@@ -460,7 +466,10 @@ func NewConfig() *Config {
 	c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
 	c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
 	c.Consumer.Offsets.Initial = OffsetNewest
 	c.Consumer.Offsets.Initial = OffsetNewest
 	c.Consumer.Offsets.Retry.Max = 3
 	c.Consumer.Offsets.Retry.Max = 3
-
+	serde, err = NewSerde(c)
+	if err == nil {
+		c.Consumer.Serde = serde
+	}
 	c.Consumer.Group.Session.Timeout = 10 * time.Second
 	c.Consumer.Group.Session.Timeout = 10 * time.Second
 	c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
 	c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
 	c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange
 	c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange

+ 28 - 8
consumer.go

@@ -17,10 +17,11 @@ type ConsumerMessage struct {
 	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
 	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
 	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
 	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
 
 
-	Key, Value []byte
-	Topic      string
-	Partition  int32
-	Offset     int64
+	Key, Value       []byte
+	AnyKey, AnyValue interface{}
+	Topic            string
+	Partition        int32
+	Offset           int64
 }
 }
 
 
 // ConsumerError is what is provided to the user when an error occurs.
 // ConsumerError is what is provided to the user when an error occurs.
@@ -74,6 +75,7 @@ type Consumer interface {
 
 
 type consumer struct {
 type consumer struct {
 	conf            *Config
 	conf            *Config
+	deserializer    Deserializer
 	children        map[string]map[int32]*partitionConsumer
 	children        map[string]map[int32]*partitionConsumer
 	brokerConsumers map[*Broker]*brokerConsumer
 	brokerConsumers map[*Broker]*brokerConsumer
 	client          Client
 	client          Client
@@ -111,6 +113,8 @@ func newConsumer(client Client) (Consumer, error) {
 		brokerConsumers: make(map[*Broker]*brokerConsumer),
 		brokerConsumers: make(map[*Broker]*brokerConsumer),
 	}
 	}
 
 
+	c.deserializer = c.conf.Consumer.Serde.Deserializer()
+
 	return c, nil
 	return c, nil
 }
 }
 
 
@@ -505,7 +509,8 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
 			if offset < child.offset {
 			if offset < child.offset {
 				continue
 				continue
 			}
 			}
-			messages = append(messages, &ConsumerMessage{
+
+			msg := &ConsumerMessage{
 				Topic:          child.topic,
 				Topic:          child.topic,
 				Partition:      child.partition,
 				Partition:      child.partition,
 				Key:            msg.Msg.Key,
 				Key:            msg.Msg.Key,
@@ -513,7 +518,14 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
 				Offset:         offset,
 				Offset:         offset,
 				Timestamp:      timestamp,
 				Timestamp:      timestamp,
 				BlockTimestamp: msgBlock.Msg.Timestamp,
 				BlockTimestamp: msgBlock.Msg.Timestamp,
-			})
+			}
+
+			msg, err := child.consumer.deserializer.Deserialize(msg)
+			if err != nil {
+				return messages, err
+			}
+
+			messages = append(messages, msg)
 			child.offset = offset + 1
 			child.offset = offset + 1
 		}
 		}
 	}
 	}
@@ -535,7 +547,8 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
 		if batch.LogAppendTime {
 		if batch.LogAppendTime {
 			timestamp = batch.MaxTimestamp
 			timestamp = batch.MaxTimestamp
 		}
 		}
-		messages = append(messages, &ConsumerMessage{
+
+		msg := &ConsumerMessage{
 			Topic:     child.topic,
 			Topic:     child.topic,
 			Partition: child.partition,
 			Partition: child.partition,
 			Key:       rec.Key,
 			Key:       rec.Key,
@@ -543,7 +556,14 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
 			Offset:    offset,
 			Offset:    offset,
 			Timestamp: timestamp,
 			Timestamp: timestamp,
 			Headers:   rec.Headers,
 			Headers:   rec.Headers,
-		})
+		}
+
+		msg, err := child.consumer.deserializer.Deserialize(msg)
+		if err != nil {
+			return messages, err
+		}
+
+		messages = append(messages, msg)
 		child.offset = offset + 1
 		child.offset = offset + 1
 	}
 	}
 	if len(messages) == 0 {
 	if len(messages) == 0 {

+ 20 - 0
deserializer.go

@@ -0,0 +1,20 @@
+package sarama
+
+type deserializer struct {
+	config *Config
+}
+
+type Deserializer interface {
+	Deserialize(msg *ConsumerMessage) (*ConsumerMessage, error)
+}
+
+func NewDeserializer(config *Config) (*deserializer, error) {
+	deserializer := &deserializer{
+		config: config,
+	}
+	return deserializer, nil
+}
+
+func (deserializer *deserializer) Deserialize(msg *ConsumerMessage) (*ConsumerMessage, error) {
+	return msg, nil
+}

+ 6 - 0
errors.go

@@ -49,6 +49,12 @@ var ErrControllerNotAvailable = errors.New("kafka: controller is not available")
 // the metadata.
 // the metadata.
 var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")
 var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")
 
 
+// SerDe serialization error
+var ErrSerializationError = errors.New("kafka: producer serialization error")
+
+// SerDe deserialization error
+var ErrDeserializationError = errors.New("kafka: consumer deserialization error")
+
 // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
 // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
 // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
 // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
 type PacketEncodingError struct {
 type PacketEncodingError struct {

+ 35 - 0
serde.go

@@ -0,0 +1,35 @@
+package sarama
+
+type serde struct {
+	config       *Config
+	serializer   *serializer
+	deserializer *deserializer
+}
+
+type Serde interface {
+	Serializer() Serializer
+	Deserializer() Deserializer
+}
+
+func NewSerde(config *Config) (Serde, error) {
+	serde := &serde{
+		config: config,
+	}
+	serializer, err := NewSerializer(config)
+	if err == nil {
+		serde.serializer = serializer
+	}
+	deserializer, err := NewDeserializer(config)
+	if err == nil {
+		serde.deserializer = deserializer
+	}
+	return serde, err
+}
+
+func (serde *serde) Serializer() Serializer {
+	return serde.serializer
+}
+
+func (serde *serde) Deserializer() Deserializer {
+	return serde.deserializer
+}

+ 20 - 0
serializer.go

@@ -0,0 +1,20 @@
+package sarama
+
+type serializer struct {
+	config *Config
+}
+
+type Serializer interface {
+	Serialize(msg *ProducerMessage) (*ProducerMessage, error)
+}
+
+func NewSerializer(config *Config) (*serializer, error) {
+	serializer := &serializer{
+		config: config,
+	}
+	return serializer, nil
+}
+
+func (serializer *serializer) Serialize(msg *ProducerMessage) (*ProducerMessage, error) {
+	return msg, nil
+}