背景
kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,如果遇到了消息在业务处理时出现异常,就会很难进行下一步处理。应对这种场景,需要自己实现消息重试的功能。
如果不想自己实现消息重试机制,建议使用RocketMQ作为消息队列,RocketMQ的消息重试机制相当完善,对于开发者使用也非常友好,详见https://help.aliyun.com/document_detail/43490.html。
方案
申请一个新的kafka topic作为重试队列,步骤如下:
- 创建一个topic作为重试topic用于接受等待重试的消息
- 普通topic消费者给待重试的消息设置下一次的消费事件后发送到重试topic
- 从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序
- 定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic
- 同一个消息重试次数过多则不再重试
代码实现
重试消息的javaBean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public class KafkaRetryRecord {
public static final String KEY_RETRY_TIMES = "retryTimes";
private String key; private String value;
private Integer retryTimes; private String topic; private Long nextTime;
public KafkaRetryRecord(){ }
public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } public Integer getRetryTimes() { return retryTimes; } public void setRetryTimes(Integer retryTimes) { this.retryTimes = retryTimes; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public Long getNextTime() { return nextTime; } public void setNextTime(Long nextTime) { this.nextTime = nextTime; }
public ProducerRecord parse(){ Integer partition = null; Long timestamp = System.currentTimeMillis(); List<Header> headers = new ArrayList<>(); ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4); retryTimesBuffer.putInt(retryTimes); retryTimesBuffer.flip(); headers.add(new RecordHeader(KafkaRetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));
ProducerRecord sendRecord = new ProducerRecord( topic, partition, timestamp, key, value, headers); return sendRecord; } }
|
消费端的消息发送到重试队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| public class KafkaRetryService {
private static final Logger log = LoggerFactory.getLogger(KafkaRetryService.class);
private static final int[] RETRY_INTERVAL_SECONDS = {10, 30, 1*60, 2*60, 5*60, 10*60, 30*60, 1*60*60, 2*60*60};
@Value("${spring.kafka.topics.retry}") private String retryTopic; @Autowired private KafkaTemplate<String, String> template;
public void consumerLater(ConsumerRecord<String, String> record){ int retryTimes = getRetryTimes(record); Date nextConsumerTime = getNextConsumerTime(retryTimes); if(nextConsumerTime == null) { return; }
KafkaRetryRecord retryRecord = new KafkaRetryRecord(); retryRecord.setNextTime(nextConsumerTime.getTime()); retryRecord.setTopic(record.topic()); retryRecord.setRetryTimes(retryTimes); retryRecord.setKey(record.key()); retryRecord.setValue(record.value());
String value = JSON.toJSONString(retryRecord); template.send(retryTopic, null, value); }
private int getRetryTimes(ConsumerRecord record){ int retryTimes = -1; for(Header header : record.headers()){ if(KafkaRetryRecord.KEY_RETRY_TIMES.equals(header.key())){ ByteBuffer buffer = ByteBuffer.wrap(header.value()); retryTimes = buffer.getInt(); } } retryTimes++; return retryTimes; }
private Date getNextConsumerTime(int retryTimes){ if(RETRY_INTERVAL_SECONDS.length < retryTimes) { return null; }
Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]); return calendar.getTime(); }
}
|
处理待消费的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class RetryListener { private static final Logger log = LoggerFactory.getLogger(RetryListener.class);
private static final String RETRY_KEY_ZSET = "_retry_key"; private static final String RETRY_VALUE_MAP = "_retry_value"; @Autowired private RedisTemplate<String,Object> redisTemplate; @Autowired private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "${spring.kafka.topics.retry}") public void consume(List<ConsumerRecord<String, String>> list) { for(ConsumerRecord<String, String> record : list){ KafkaRetryRecord retryRecord = JSON.parseObject(record.value(), KafkaRetryRecord.class);
String key = UUID.randomUUID().toString(); redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value()); redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime()); } }
@Scheduled(cron="0/2 * * * * *") public void retryFormRedis() { long currentTime = System.currentTimeMillis(); Set<ZSetOperations.TypedTuple<Object>> typedTuples = redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime); redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime); for(ZSetOperations.TypedTuple<Object> tuple : typedTuples){ String key = tuple.getValue().toString(); String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString(); redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key); KafkaRetryRecord retryRecord = JSON.parseObject(value, KafkaRetryRecord.class); ProducerRecord record = retryRecord.parse(); kafkaTemplate.send(record); } } }
|
消息重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class ConsumeListener { private static final Logger log = LoggerFactory.getLogger(ConsumeListener.class);
@Autowired private KafkaRetryService kafkaRetryService;
@KafkaListener(topics = "${spring.kafka.topics.test}") public void consume(List<ConsumerRecord<String, String>> list) { for(ConsumerRecord record : list){ try { } catch (Exception e){ log.error(e.getMessage()); kafkaRetryService.consumerLater(record); } } }
}
|