|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
使用 Kafka 绑定进行分区
Apache Kafka 原生支持主题分区。
有时将数据发送到特定分区是有利的——例如,当您希望严格按顺序处理消息时(针对特定客户的的所有消息应发送到同一分区)。
下面的例子展示了如何配置生产者和消费者端:
@SpringBootApplication
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
| 请注意,由于Apache Kafka原生支持分区,因此除非您像示例中那样使用自定义分区键或涉及有效负载本身的表达式,否则无需依赖上述绑定器分区。 绑定器提供的分区选择通常用于不支持原生分区的中间件技术。请注意,上面的示例中使用了一个自定义键 partitionKey,这将是决定分区的因素,在这种情况下,适合使用绑定器分区。使用原生Kafka分区时,i。e,如果您不提供partition-key-expression,则Apache Kafka会选择一个分区,默认情况下该分区将是记录键的哈希值与可用分区数的比值。要向出站记录添加密钥,请在 spring-messaging Message<?> 中将 KafkaHeaders.KEY 标头设置为所需的密钥值。当未提供记录键时,Apache Kafka 默认会根据Apache Kafka 文档中描述的逻辑选择一个分区。
|
主题必须配置足够的分区,以便为所有消费者组实现所需的并发性。
上述配置最多支持12个消费者实例(如果它们的concurrency是2,则最多支持6个;如果并发度是3,则最多支持4个,以此类推)。
通常最好“过度配置”分区,以允许未来增加消费者或提高并发度。 |
先前的配置使用了默认分区(key.hashCode() % partitionCount)。
这可能提供也可能不提供一个合适平衡的算法,具体取决于键值。特别是,请注意此分区策略与独立Kafka生产者使用的默认策略不同——例如Kafka Streams所用的一个,这意味着当由这些客户端产生时,相同的键值可能会在不同的分区中以不同的方式平衡。
您可以使用partitionSelectorExpression或partitionSelectorClass属性来覆盖这个默认设置。 |
由于分区由 Kafka 原生处理,因此消费者端不需要特殊配置。Kafka 会在实例之间分配分区。
| Kafka 主题的分区数在运行时可能会发生变化(例如,由于管理任务)。</p><p>在此之后计算的分区将不同(例如,将使用新分区)。</p><p>从 Spring Cloud Stream 的 4.0.3 版本开始,支持运行时更改分区数量。</p><p>另请参阅参数 'spring.kafka.producer.properties.metadata.max.age.ms' 来配置更新间隔。</p><p>由于某些限制,在这种情况下无法使用引用消息 'payload' 的 'partition-key-expression',该机制将被禁用。</p><p>默认情况下,整体行为是禁用的,可以使用配置参数 'producer.dynamicPartitionUpdatesEnabled=true' 启用它。 |
以下 Spring Boot 应用程序侦听 Kafka 流,并将每个消息进入的分区 ID 打印到控制台(Console):
@SpringBootApplication
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
System.out.println(message + " received from partition " + partition);
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.topic
group: myGroup
可以根据需要添加实例。
Kafka 会重新平衡分区分配。
如果实例数量(或 instance count * concurrency)超过分区数,则某些消费者处于空闲状态。