package sarama import ( "sort" "time" ) // request message format is: // [topic] timeout(int32) // where topic is: // name(string) [partition] // where partition is: // id(int32) offset(int64) type DeleteRecordsRequest struct { Topics map[string]*DeleteRecordsRequestTopic Timeout time.Duration } func (d *DeleteRecordsRequest) encode(pe packetEncoder) error { if err := pe.putArrayLength(len(d.Topics)); err != nil { return err } keys := make([]string, 0, len(d.Topics)) for topic := range d.Topics { keys = append(keys, topic) } sort.Strings(keys) for _, topic := range keys { if err := pe.putString(topic); err != nil { return err } if err := d.Topics[topic].encode(pe); err != nil { return err } } pe.putInt32(int32(d.Timeout / time.Millisecond)) return nil } func (d *DeleteRecordsRequest) decode(pd packetDecoder, version int16) error { n, err := pd.getArrayLength() if err != nil { return err } if n > 0 { d.Topics = make(map[string]*DeleteRecordsRequestTopic, n) for i := 0; i < n; i++ { topic, err := pd.getString() if err != nil { return err } details := new(DeleteRecordsRequestTopic) if err = details.decode(pd, version); err != nil { return err } d.Topics[topic] = details } } timeout, err := pd.getInt32() if err != nil { return err } d.Timeout = time.Duration(timeout) * time.Millisecond return nil } func (d *DeleteRecordsRequest) key() int16 { return 21 } func (d *DeleteRecordsRequest) version() int16 { return 0 } func (d *DeleteRecordsRequest) headerVersion() int16 { return 1 } func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } type DeleteRecordsRequestTopic struct { PartitionOffsets map[int32]int64 // partition => offset } func (t *DeleteRecordsRequestTopic) encode(pe packetEncoder) error { if err := pe.putArrayLength(len(t.PartitionOffsets)); err != nil { return err } keys := make([]int32, 0, len(t.PartitionOffsets)) for partition := range t.PartitionOffsets { keys = append(keys, partition) } sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) for _, partition := range keys { pe.putInt32(partition) pe.putInt64(t.PartitionOffsets[partition]) } return nil } func (t *DeleteRecordsRequestTopic) decode(pd packetDecoder, version int16) error { n, err := pd.getArrayLength() if err != nil { return err } if n > 0 { t.PartitionOffsets = make(map[int32]int64, n) for i := 0; i < n; i++ { partition, err := pd.getInt32() if err != nil { return err } offset, err := pd.getInt64() if err != nil { return err } t.PartitionOffsets[partition] = offset } } return nil }