DengQN·一个普通程序员;
【Crud boy日常】订阅Kafka protobuf格式数据
2024-05-06 11:44 724
#kafka#springboot#protobuf#crud

protoc

下载 protoc-27.0-rc-1-win64.zip

  1. 解压
  2. 环境变量设置到 PROTOCBUF_HOME

使用protoc生成java代码

protoc --proto_path=. --java_out=build/ ./XX.proto

consumer factory

这里指定了反序列化工具 ProtoDeserializer

@Bean(name = "protoKafkaConsumerFactory")
public ConsumerFactory<Object,Object> protoKafkaConsumerFactory() {
    // 关键
    // 反序列化
    kafkaProperties.getConsumer().setKeyDeserializer(StringDeserializer.class);
    kafkaProperties.getConsumer().setValueDeserializer(ProtoDeserializer.class);
    kafkaProperties.getConsumer().setGroupId("xxxxx");
    kafkaProperties.setBootstrapServers(CollUtil.toList(ip:port));
    //kafka的配置
    Map<String, Object> props = kafkaProperties.buildConsumerProperties();
    //创建生产者工厂
    DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(props);
    return factory;
}

这里创建个protoKafkaListenerContainerFactory 到时候监听的时候指定使用

@Bean(name = "protoKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?>  protoKafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, protoKafkaConsumerFactory());
    return factory;
}

反序列化工具

实现org.apache.kafka.common.serialization.Deserializer接口

根据具体业务topic反序列化到对象即可。

public class ProtoDeserializer implements Deserializer<Object> {
	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		Deserializer.super.configure(configs, isKey);
	}

	@Override
	public Object deserialize(String topic, byte[] data) {
		try {
			switch (topic) {
				case "xx": return XX.parseFrom(data);
				//
				case "yy": return YY.parseFrom(data);
				case "zz": return XX.parseFrom(data);
			}
			return null;
		} catch (InvalidProtocolBufferException e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public Object deserialize(String topic, Headers headers, byte[] data) {
		return this.deserialize(topic, data);
	}

	@Override
	public void close() {
		Deserializer.super.close();
	}
}

订阅数据

@KafkaListener(
    topics = {"xx"},
    groupId = "xxxxxxxxx",
    containerFactory = "protoKafkaListenerContainerFactory"
)
public void readKafkaProtoc(XX x) {
    // buz code
}

发送

如果有发送的情况

创建producer

@Bean(name = "protoKafkaProducerFactory")
public ProducerFactory<Object,Object> protoKafkaProducerFactory() {
    kafkaProperties.getProducer().setKeySerializer(StringSerializer.class);
    kafkaProperties.setBootstrapServers(CollUtil.toList(ip:port));
    kafkaProperties.getConsumer().setGroupId("xxx");
    //kafka的配置
    Map<String, Object> props = kafkaProperties.buildProducerProperties();
    //创建生产者工厂
    DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(props);
    //处理事务ID前缀
    String transactionIdPrefix = this.kafkaProperties.getProducer().getTransactionIdPrefix();
    if (transactionIdPrefix != null) {
        factory.setTransactionIdPrefix(transactionIdPrefix);
    }

    // 序列化
    factory.setKeySerializer((topic, data) -> topic.getBytes(StandardCharsets.UTF_8));
    factory.setValueSerializer((topic, data) -> {
        if (data instanceof XX) {
            return ((XX) data).toByteArray();
        } else if (data instanceof YY) {
            return ((YY) data).toByteArray();
        } else if (data instanceof ZZ) {
            return ((ZZ) data).toByteArray();
        }
        return null;
    });
    return factory;
}

根据不同的类型调用生成的序列化方法。

template

创建专门的template

@Bean(name = "protoKafkaTemplate")
public KafkaTemplate<?, ?> protoKafkaTemplate(
    //生产者监听器
    ProducerListener<Object, Object> kafkaProducerListener,
    //消息转换器
    ObjectProvider<RecordMessageConverter> messageConverter) {
    //创建kafkaTemplate
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(protoKafkaProducerFactory());
    //设置消息转换器
    messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
    //设置消息监听器
    kafkaTemplate.setProducerListener(kafkaProducerListener);
    //设置默认的topic 可以在yml指定: spring.kafka.template.default-topic: 默认Topic
    kafkaTemplate.setDefaultTopic(this.kafkaProperties.getTemplate().getDefaultTopic());
    return kafkaTemplate;
}
XX.Builder builder = XX.newBuilder();
XX value = builder.x(11111111L)
        .x(123131L)
        .x(System.currentTimeMillis())
        .x(444444L).build();
ListenableFuture<SendResult<String, Object>> send = protoKafkaTemplate.send("xx", value);

send.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
    @Override
    public void onFailure(Throwable ex) {
        log.error(ex.getMessage(), ex);
        log.error("kafka发送消息[{}] <-- [{}]失败", "xx", value);
    }

    @Override
    public void onSuccess(SendResult<String, Object> result) {
        log.info("kafka发送消息成功,topic:{},data:{}", "xx", value);
    }
});