|
@@ -1,17 +1,11 @@
|
|
|
package kafka
|
|
package kafka
|
|
|
|
|
|
|
|
-type ProduceResponsePartitionBlock struct {
|
|
|
|
|
- Id int32
|
|
|
|
|
|
|
+type ProduceResponsePartition struct {
|
|
|
Err KError
|
|
Err KError
|
|
|
Offset int64
|
|
Offset int64
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (pr *ProduceResponsePartitionBlock) decode(pd packetDecoder) (err error) {
|
|
|
|
|
- pr.Id, err = pd.getInt32()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+func (pr *ProduceResponsePartition) decode(pd packetDecoder) (err error) {
|
|
|
pr.Err, err = pd.getError()
|
|
pr.Err, err = pd.getError()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
@@ -25,48 +19,42 @@ func (pr *ProduceResponsePartitionBlock) decode(pd packetDecoder) (err error) {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-type ProduceResponseTopicBlock struct {
|
|
|
|
|
- Name *string
|
|
|
|
|
- Partitions []ProduceResponsePartitionBlock
|
|
|
|
|
|
|
+type ProduceResponse struct {
|
|
|
|
|
+ Partitions map[*string]map[int32]*ProduceResponsePartition
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (pr *ProduceResponseTopicBlock) decode(pd packetDecoder) (err error) {
|
|
|
|
|
- pr.Name, err = pd.getString()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- n, err := pd.getArrayCount()
|
|
|
|
|
|
|
+func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
|
|
|
|
|
+ numTopics, err := pd.getArrayCount()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- pr.Partitions = make([]ProduceResponsePartitionBlock, n)
|
|
|
|
|
- for i := range pr.Partitions {
|
|
|
|
|
- err = (&pr.Partitions[i]).decode(pd)
|
|
|
|
|
|
|
+ pr.Partitions = make(map[*string]map[int32]*ProduceResponsePartition, numTopics)
|
|
|
|
|
+ for i := 0; i < numTopics; i++ {
|
|
|
|
|
+ name, err := pd.getString()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- return nil
|
|
|
|
|
-}
|
|
|
|
|
|
|
+ numPartitions, err := pd.getArrayCount()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
-type ProduceResponse struct {
|
|
|
|
|
- Topics []ProduceResponseTopicBlock
|
|
|
|
|
-}
|
|
|
|
|
|
|
+ pr.Partitions[name] = make(map[int32]*ProduceResponsePartition, numPartitions)
|
|
|
|
|
|
|
|
-func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
|
|
|
|
|
- n, err := pd.getArrayCount()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ for j := 0; j < numPartitions; j++ {
|
|
|
|
|
+ id, err := pd.getInt32()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- pr.Topics = make([]ProduceResponseTopicBlock, n)
|
|
|
|
|
- for i := range pr.Topics {
|
|
|
|
|
- err = (&pr.Topics[i]).decode(pd)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
|
|
+ partition := new(ProduceResponsePartition)
|
|
|
|
|
+ err = partition.decode(pd)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ pr.Partitions[name][id] = partition
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|