PROTOCBUF_HOME
使用protoc生成java代码
protoc --proto_path=. --java_out=build/ ./XX.proto
这里指定了反序列化工具 ProtoDeserializer
@Bean(name = "protoKafkaConsumerFactory")
public ConsumerFactory<Object,Object> protoKafkaConsumerFactory() {
// 关键
// 反序列化
kafkaProperties.getConsumer().setKeyDeserializer(StringDeserializer.class);
kafkaProperties.getConsumer().setValueDeserializer(ProtoDeserializer.class);
kafkaProperties.getConsumer().setGroupId("xxxxx");
kafkaProperties.setBootstrapServers(CollUtil.toList(ip:port));
//kafka的配置
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
//创建生产者工厂
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(props);
return factory;
}
这里创建个protoKafkaListenerContainerFactory
到时候监听的时候指定使用
@Bean(name = "protoKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> protoKafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, protoKafkaConsumerFactory());
return factory;
}
实现org.apache.kafka.common.serialization.Deserializer
接口
根据具体业务topic反序列化到对象即可。
public class ProtoDeserializer implements Deserializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}
@Override
public Object deserialize(String topic, byte[] data) {
try {
switch (topic) {
case "xx": return XX.parseFrom(data);
//
case "yy": return YY.parseFrom(data);
case "zz": return XX.parseFrom(data);
}
return null;
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
return this.deserialize(topic, data);
}
@Override
public void close() {
Deserializer.super.close();
}
}
@KafkaListener(
topics = {"xx"},
groupId = "xxxxxxxxx",
containerFactory = "protoKafkaListenerContainerFactory"
)
public void readKafkaProtoc(XX x) {
// buz code
}
如果有发送的情况
@Bean(name = "protoKafkaProducerFactory")
public ProducerFactory<Object,Object> protoKafkaProducerFactory() {
kafkaProperties.getProducer().setKeySerializer(StringSerializer.class);
kafkaProperties.setBootstrapServers(CollUtil.toList(ip:port));
kafkaProperties.getConsumer().setGroupId("xxx");
//kafka的配置
Map<String, Object> props = kafkaProperties.buildProducerProperties();
//创建生产者工厂
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(props);
//处理事务ID前缀
String transactionIdPrefix = this.kafkaProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
// 序列化
factory.setKeySerializer((topic, data) -> topic.getBytes(StandardCharsets.UTF_8));
factory.setValueSerializer((topic, data) -> {
if (data instanceof XX) {
return ((XX) data).toByteArray();
} else if (data instanceof YY) {
return ((YY) data).toByteArray();
} else if (data instanceof ZZ) {
return ((ZZ) data).toByteArray();
}
return null;
});
return factory;
}
根据不同的类型调用生成的序列化方法。
创建专门的template
@Bean(name = "protoKafkaTemplate")
public KafkaTemplate<?, ?> protoKafkaTemplate(
//生产者监听器
ProducerListener<Object, Object> kafkaProducerListener,
//消息转换器
ObjectProvider<RecordMessageConverter> messageConverter) {
//创建kafkaTemplate
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(protoKafkaProducerFactory());
//设置消息转换器
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
//设置消息监听器
kafkaTemplate.setProducerListener(kafkaProducerListener);
//设置默认的topic 可以在yml指定: spring.kafka.template.default-topic: 默认Topic
kafkaTemplate.setDefaultTopic(this.kafkaProperties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
XX.Builder builder = XX.newBuilder();
XX value = builder.x(11111111L)
.x(123131L)
.x(System.currentTimeMillis())
.x(444444L).build();
ListenableFuture<SendResult<String, Object>> send = protoKafkaTemplate.send("xx", value);
send.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.error(ex.getMessage(), ex);
log.error("kafka发送消息[{}] <-- [{}]失败", "xx", value);
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("kafka发送消息成功,topic:{},data:{}", "xx", value);
}
});