使用 Kafka Binder 进行分区
Apache Kafka 原生支持主题分区。
有时将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时(特定客户的所有消息都应转到同一分区)。
以下示例展示了如何配置生产者端和消费者端:
@SpringBootApplication
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
重要的是要记住,由于 Apache Kafka 本机支持分区,因此无需依赖上述 Binder 分区,除非您使用示例中的自定义分区键或涉及有效负载本身的表达式。
活页夹提供的分区选择适用于不支持本机分区的中间件技术。
请注意,我们使用的是一个名为partitionKey 在上面的示例中,这将是分区的决定因素,因此在这种情况下,使用活页夹分区是合适的。
使用本机 Kafka 分区时,即当您不提供partition-key-expression ,则 Apache Kafka 将选择一个分区,默认情况下,该分区将是可用分区数的记录键的哈希值。
要向出站记录添加键,请将KafkaHeaders.KEY header 添加到 spring-messaging 中所需的键值Message<?> .
默认情况下,当未提供记录键时,Apache Kafka 将根据 Apache Kafka 文档中描述的逻辑选择分区。 |
必须预配主题以具有足够的分区,以便为所有使用者组实现所需的并发性。
上述配置最多支持 12 个消费者实例(如果其concurrency 是 2,如果它们的并发性为 3,则为 4,依此类推)。
通常最好“过度预配”分区,以允许将来增加使用者或并发性。 |
上述配置使用默认分区 (key.hashCode() % partitionCount ).
这可能会也可能不会提供适当平衡的算法,具体取决于键值。特别请注意,此分区策略与独立 Kafka 生产者使用的默认策略(例如 Kafka Streams 使用的默认策略)不同,这意味着当这些客户端生成时,相同的键值可能会在分区之间以不同的方式平衡。
您可以使用partitionSelectorExpression 或partitionSelectorClass 性能。 |
由于分区由 Kafka 原生处理,因此消费者端不需要特殊配置。 Kafka 在实例之间分配分区。
kafka 主题的 partitionCount 可能会在运行时更改(例如,由于管理任务)。 之后计算的分区将有所不同(例如,届时将使用新分区)。 从 Spring Cloud Stream 的 4.0.3 开始,将支持分区计数的更改。 另请参阅参数“spring.kafka.producer.properties.metadata.max.age.ms”以配置更新间隔。 由于某些限制,无法使用引用消息的“有效负载”的“partition-key-expression”,在这种情况下,该机制将被禁用。 默认情况下,整体行为处于禁用状态,可以使用配置参数“producer.dynamicPartitionUpdatesEnabled=true”启用。 |
以下 Spring Boot 应用程序监听 Kafka 流并打印(到控制台)每条消息转到的分区 ID:
@SpringBootApplication
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
System.out.println(message + " received from partition " + partition);
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.topic
group: myGroup
您可以根据需要添加实例。
Kafka 重新平衡分区分配。
如果实例计数(或instance count * concurrency
) 超过分区数,部分消费者处于空闲状态。