golang: sarama 链接Kafka

golang: sarama 链接Kafka

stromXu 829 2022-06-23

写在前面:
本章来源于 https://gitee.com/Cookie_XiaoD/easykafka 大佬的文章,大部分代码都可以再上面找到
再次感谢大佬提供的:
虽然是别人写的,最好是自己理解透彻,关于kafka内容网上很多就不放进去了

1、目录结构

test
--------------main.go
--------------setting.json
--------------common
------------------------common/kafka
------------------------common/kafka/consumer.go
------------------------common/kafka/data.go
------------------------common/kafka/kafka_setting.go
------------------------common/kafka/producer.go
--------------common/spec
------------------------common/spec/aormode.go
------------------------common/spec/auth.go
------------------------common/spec/consumer.go
------------------------common/spec/msg.go
------------------------common/spec/producer.go
------------------------producer.go —使用

2、主要代码:

2.1:setting.go

{
   "kafka": {
    "brokers": "127.0.0.1:9092",
    "topicName": "test"
   }
}

2.2:kafka_setting.go

//读取
var KafkaData = &Kafka{}

//读取setting文件
func init() {
	filePtr, err := os.Open("./setting.json")
	if err != nil {
		log.Printf("文件打开失败 [Err:%s]", err.Error())
		return
	}
	defer filePtr.Close()
	// 创建json解码器
	info := KafkaConfig{}
	decoder := json.NewDecoder(filePtr)
	err = decoder.Decode(&info)
	if err != nil {
		fmt.Println("redis解码失败", err.Error())
	}
	KafkaData = &info.Kafka
}

type KafkaConfig struct {
	Kafka Kafka `json:"kafka"`
}

type Kafka struct {
	Brokers   string `json:"brokers"`
	TopicName string `json:"topicName"`
}

2.3:producer.go(生产者)

package kafka

import (
	easykafka "common/kafka/spec"
	"common/kafka/spec"
	"log"
)

func ProduceSend(content string) error {
   //就是setting.json里面的配置
	var topicName = easykafka.KafkaData.TopicName
	var brokers = easykafka.KafkaData.Brokers
	var err error
	producer, err := easykafka.NewProducer(
		brokers,
		easykafka.WithProducerErrorHandler(func(err *easykafka.AsyncProduceError) {
			log.Println("异步生产消息时发生错误:", err)
		}),
		easykafka.WithProducerAckMode(spec.WaitLeader))
	if err != nil {
		log.Fatalf(err.Error())
	}
	
	defer func() {
		if err = producer.Close(); err != nil {
			log.Println("关闭生产者发生错误:", err)
		}
	}()

	size, err := producer.SyncProduce(topicName, "mineLog", content)
	if err != nil {
		log.Println("发送消息错误:", err)
		return err
	} else {
		log.Println("发送成功, 数据大小:", size)
	}
	return nil
}

type ExampleData struct {
	Content string `json:"content"`
	Seq     string `json:"seq"`
}

2.4:consumer.go(消费者使用—)

package main

import (
	"context"
	"encoding/json"
     easykafka "common/kafka/spec"
	"common/kafka/spec"
	"log"
	"time"
)

var consumer spec.Consumer
var msgs = make(chan spec.Msg, 10000)
var brokers = "127.0.0.1:9092"

func main() {
	startConsumer()
	var batch []spec.Msg
	//模拟每收到5条数据就进行一次处理,处理完成后批量提交
	for {
		msg := <-msgs
		batch = append(batch, msg)
		if len(batch) != 5 {
			log.Println("接收到", len(batch), "条数据")
			continue
		}
		log.Println("接收到5条数据,开始处理")
		for _, v := range batch {
			var data ExampleData
			err := json.Unmarshal(v.Data(), &data)
			if err != nil {
				continue
			}
			log.Println("处理数据:", data)
		}
		log.Println("处理完成开始批量提交")
		err := consumer.ConfirmBatch(batch)
		if err != nil {
			log.Println("批量提交失败:", err)
		} else {
			log.Println("批量提交成功")
		}
		batch = batch[0:0]
		time.Sleep(1 * time.Second)
	}
}

func startConsumer() {
	go func() {
		var err error
		consumer, err = easykafka.NewConsumer(
			brokers,
			[]string{"topic_example"},
			"group_example",
			handleMsg,
			easykafka.WithConsumerErrorHandler(handleErr),
			easykafka.WithConsumerAOR(spec.Earliest),
			easykafka.WithConsumerManualCommit(true))
		if err != nil {
			log.Fatalf(err.Error())
		}
		defer func() {
			if err := consumer.Close(); err != nil {
				log.Println("关闭消费者发生错误:", err)
			}
		}()
		log.Println("开始接收数据")
		consumer.StartBlock(context.Background())
	}()
}

func handleMsg(msg spec.Msg) {
	log.Println("接收到数据", msg.Topic(), msg.Partition(), msg.Offset())
	msgs <- msg
}

func handleErr(err *easykafka.ConsumeError) {
	log.Println("发生错误:", err)
}

type ExampleData struct {
	Content string `json:"content"`
	Seq     string `json:"seq"`
}

3、注意避雷

在:SyncProduce 函数中有:msg, err := p.getMsg(topic, key, data) 代码:作者是这样写的:

func (p *Producer) getMsg(topic, key string, data interface{}) (*sarama.ProducerMessage, error) {
	if strings.TrimSpace(topic) == "" {
		return nil, errors.New("topic无效")
	}
	if strings.TrimSpace(key) == "" {
		return nil, errors.New("key无效")
	}

	 if data == nil {
	 	return nil, errors.New("data无效")
	 }
	 ret, err := p.dataEncoder(data)
	 if err != nil {
	 	return nil, fmt.Errorf("data序列化失败:%w", err)
	 }
	msg := &sarama.ProducerMessage{
		Topic: topic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.ByteEncoder(ret),
	}
	return msg, nil
}

----------------------------------------
Value: sarama.ByteEncoder(ret) 是已经序列化了需要消费或者发送的值,如果你的代码只是接受的是string, 这个地方需要修改,不序列化就好,不然用转json的时候会报错

# go