my RocketMQ The architecture is as follows :
Fault description : Broker-b The server is down 8 Hours (1:00-9:00), restart Broker-b after , 8 Messages generated during the hour period are consumed by consumers , Because the condition of equal power of consumer is :2 The same news within hours (msgId identical ) Don't repeat , But it's been more than two hours now , so 1:00-7:00 Messages generated during this period are consumed repeatedly .
expect : Broker-b After restart , Consumers only subscribe to messages that start at the current time , Previous messages are no longer subscribed to .
Treatment scheme 1:
Set the location of the consumer's subscription message to the latest location
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
Note the code comments here . This parameter only applies to a new consumeGroup It works the first time it starts .
That is to say , If it's a consumerGroup restart , He's only going to spend the last time offset, Continue to consume . This parameter is useless . And judging whether it's a new ConsumerGroup Is in broker End judgment .
Need to know , Consumption to which offset The first is existence Consumer The local , Timing and broker Synchronize your own consumption offset.broker In judging whether it is a new consumergroup, Just check broker Do you have this consumergroup Of offset Record .
in addition , For a new queue, This parameter is also useless , from 0 Start spending .
advantage : Simple processing , Just change one line .
shortcoming : It takes a new consumer to take effect , Therefore, this method is suitable for broker It already contains ConsumerGroup Of , It doesn't work .
Treatment scheme 2:
modify broker-b Of offset, and broker-a The configuration of is consistent .
Method :
hold broker-a Of rocketmq route :${userPath}/store/config/consumerOffset.json File copy to broker-b In the same position as , restart broker-b
advantage : Simple and quick , There is no need to modify the program code .
shortcoming : because broker-a The message queue of the node is always updated ,offset The offset has been increasing , So copy to restart this period of time , There is still a small amount of information that will be consumed repeatedly , This can only depend on the processing of idempotent code .
Remedy after the fact :
1. The condition duration of consumer's equal power operation is set longer (1 Days or 3 God , I use redis Storage msgId, When the amount of messages is small, you can set a longer duration )
// Cache messages ID Prevent duplication of consumption
boolean setResult = RedisUtil.setnx(msgExt.getMsgId(), 60 * 60 * 24 * 1, msgExt.getMsgId().getBytes("utf-8"));
if (!setResult) {
// If the storage is not successful, it means that it has been stored , Go straight back to success
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
2. increase rocketmq Downtime alarm