如果有什么卡夫卡的消费者处理的一个消息太长? 将卡夫卡的重新任命该分区到另一个消费者和消息将加倍的处理?

0

的问题

假设 Kafka, 1 partition, 2 consumers.(2nd消费者是闲置)

假设1个消耗的消息,去处理它与其他3个服务以及突然棒在他们中的一个而错过卡夫卡的超时。

将卡夫卡的重新任命该分区的第2次消费者和消息将加倍的处理(假设1个最终成功的)?

1

最好的答案

1

如果有什么卡夫卡的消费者处理的一个消息太长? 将卡夫卡的重新任命该分区到另一个消费者和消息将加倍的处理?

是的,这是正确的。 如果卡夫卡消费的时间太长,以处理的消息及 后续调查()被延迟了,卡夫卡将重新任命该分区到另一个消费者和消息将再次处理(和再)。

为了更清晰起见,首先,我们需要决定和定义的'多长时间太长时间?'.

这是定义的酒店 max.poll.interval.ms. 从 ,

最大延迟之间调用的调查()时,采用消费者组管理。 这一上限的时间量,消费者可以被闲置之前获取更多的记录。 如果调查()不是所谓的之前到期的这超时,则消费者被认为是失败的,该小组将重新平衡,以便重新分配分区到另一个成员。

消费者组是重新平衡,如果没有电话对投票()内的这段时间。

还有一个酒店 auto.commit.interval.ms. 自动提交的偏移量检查将被称为只在调查检查是否经过的时间大于已配置的自动提交的间隔时间,如果结果是肯定的,偏移的承诺。

如果卡夫卡的消费太长期过程的记录,然后随后的调查()call也被推迟,抵消返回在最后的调查()未提交。 如果重新平衡发生在这个时候,新的消费者客户分配给这个分区将开始处理的消息。

消费者组重新平衡和所得的分区重新分配可以避免通过增加这一数值。 这将增加允许之间的间隔调查,并给予更多的时间向消费者来处理的记录(s)返回询(). 消费者将只参加的平衡的内部呼叫调查,所以增加max调查间隔时间也会延误小组重新平衡.

还有一个问题,在增加最大的投票时间间隔大的价值。 如果消费者死亡,其他一些原因,需要更长的时间比配置 max.poll.interval.ms 间隔检测到故障。

session.timeout.msheartbeat.interval.ms 都可以在这种情况下检测总的失败正如早些时候作为可能。

对于有关的更多详细信息,这些参数:

请注意,值构成为 session.timeout.ms 必须在允许的范围内配置在代理配置的性质

  • 小组。分。会议。超时。ms
  • 小组。最大。会议。超时。ms

否则,下面的异常会被抛开消费者客户。

Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)

更新:为了避免处理消息再一次

还有另一种方法在KafkaConsumer类 commitAsync() 触发的犯偏移的操作。

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();

更多详细信息commitSync()和commitAsync(),请查看 这个螺纹

犯下偏移手动是一个行动的说法,抵消已经处理了这样,卡夫卡不会发送的承诺记录于同一个分区。 当抵消,致力手动,重要的是要注意,如果消费者死亡之前记录的处理因任何原因,有的机会,这些记录将不会再次处理。

2021-11-25 07:04:25

谢谢你,这是明显的。 是否有任何方法,以避免第二处理?
J.J. Beam

@J.J.束更新答案的链接和样本
arunkvelu

其他语言

此页面有其他语言版本

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................