C'est ma participation11Le défi du mois de juin25Oh, mon Dieu.,Voir les détails de l'événement:2021Un dernier défi
Autres:Kafka Groupes de consommateurs dans | Thème du déplacement:Kafka Comment le Groupe de consommateurs de?
Déjà présenté,Kafka Le consommateur peut soumettre manuellement le déplacement,Et peut soumettre un déplacement qui n'est pas la position actuelle,Cela permet de sauter ou de consommer à nouveau des messages.C'est surtout parce que Kafka Est un système de messagerie basé sur la structure du Journal,Au lieu de「File d'attente」Structure.
Bref,Les données de déplacement sont contrôlées par le consommateur.Notez que,Le consommateur ne peut contrôler que le déplacement,Impossible de contrôler le message,Message pour le consommateur,Toujours en lecture seule.
En fait,,Kafka Offre aux consommateurs un moyen riche de réinitialiser les déplacements , Il peut être divisé en Réinitialisation de position et Réinitialisation de temps .
Les types suivants sont fournis en fonction du poids de déplacement de la position
Réinitialiser le déplacement au premier déplacement actuel .
Attention ici.,Parce que Kafka Les journaux plus anciens seront supprimés ,Donc,, La position la plus ancienne n'est pas nécessairement 0. Si vous souhaitez consommer à nouveau tous les messages disponibles dans le sujet , Vous pouvez utiliser cette stratégie .
Réinitialiser le déplacement au dernier déplacement actuel .
Si vous voulez sauter tous les messages historiques , Commencez par les dernières nouvelles , Alors c'est avec cette stratégie .
Réinitialiser le déplacement au dernier déplacement actuellement engagé . Il n'y a pas beaucoup de scénarios d'utilisation de cette stratégie .
Réinitialiser le déplacement à un déplacement spécifié .
Parfois..., Les consommateurs tirent des messages non consommables du système de messagerie , Comme un message mal formé , Ou une erreur dans le processus de consommation , Ou pour une raison liée à l'entreprise , Le message ne peut pas être consommé .En ce moment, Vous pouvez utiliser cette politique pour sauter , Le message après l'avoir consommé .
Réinitialiser le déplacement à une position opposée à la position actuelle (Position actuelle + N).
Specified-Offset Vous pouvez spécifier directement la position de déplacement à réinitialiser ,Et Shift-By-N Vous pouvez spécifier un déplacement par rapport à la position actuelle .Par exemple, N - Oui. 5 Quand,C'est comme sauter 5 Message (s),Ici. N Ou négatif, Ça va rebondir .
Specified-Offset Et Shift-By-N Peut être compris commePosition absolueEtPosition relative.
Il y a deux types de
Réinitialiser le déplacement à la première position après le temps spécifié .
Réinitialiser le déplacement à la première position après un point dans le temps par rapport à l'heure actuelle .
DateTime Et Duration Peut également être interprété commeTemps absoluEtTemps relatif.
Comprendre ces stratégies , Voici comment .
Par exemple,, Pour réinitialiser le déplacement du Groupe de consommateurs à sa position actuelle la plus ancienne , Vous pouvez utiliser la commande suivante :
bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --to-earliest –execute
Copier le Code
Latest Et Current La stratégie est similaire .
Specified-Offset Et Shift-By-N Vous devez fournir des valeurs spécifiques dans la commande , Les formats sont --to-offset <offset>
Et --shift-by <offset_N>
.
Pour DateTime Déplacement de la politique réinitialiser , Un délai précis est nécessaire :
bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --to-datetime 2021-11-25T20:00:00.000 –execute
Copier le Code
Pour Duration Stratégie, Besoin de fournir une correspondance avec ISO-8601 Standard Duration Format,Par lettre P Au début,Par derrière 4 Composants partiels,C'est - à - dire: D、H、M Et S,Jours séparés、Heures、Minutes et secondes.
bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --by-duration PT0H30M0S –execute
Copier le Code
Ici. PT0H30M0S
Représentant 30 Minutes.
Après l'exécution de ces commandes , De nouvelles informations de déplacement sont demandées sur la ligne de commande .
Si vous voulez réinitialiser le déplacement dans le programme consommateur ,Kafka Les consommateurs correspondants sont également disponibles API,Un instant. Java API:
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
Copier le Code