天天看点

kafkaConsumer的分区分配策略-实测说话

说的再多,不如自己测一遍。实际原因是源码晦涩难懂,有点懒得看。

参考下面文章。其实也没参考,就是看到了想学习下。

kafka的消费者分区分配策略_一念花开_的博客-CSDN博客_kafka分区分配策略

刚接触kafka的小伙伴其实一直忽略了一个事。

就是consumer为什么叫做group.id 这里注意是group,为什么不叫作一个consumer_name呢?

其实是因为kafka有时候会1s几十万上百万的消息,一个consumer单独处理这么多任务,处理不过来的。叫做group就是因为我可以同时起多个消费者,处于同一个group里,这个时候海量数据就会被切分为多个任务处理。

下面开始测试。。

kafkaConsumer的分区分配策略-实测说话

起三个consumer 注意查看日志。

启动consumer1

consumer1日志 他消费topic 中partition 0 1 2 

kafkaConsumer的分区分配策略-实测说话

启动consumer2.

 consumer1日志  消费topic  p0 p1 

kafkaConsumer的分区分配策略-实测说话

 consumer2日志  p2

kafkaConsumer的分区分配策略-实测说话

 启动consumer3

consumer1日志   p0 p1 -> p0 

kafkaConsumer的分区分配策略-实测说话

 consumer2日志  p2->p1

kafkaConsumer的分区分配策略-实测说话

 consumer3日志 p3

kafkaConsumer的分区分配策略-实测说话

 上述说明什么,随着消费者group的变化,kafka会按照一定的策略将分区分配到group里的消费者?

如何分配,怎么分配才合理呢?

基础版

一般来说 我们group可以随便起名 a b 1234 cclovezbf c1 c2。因为kafka的数据比较少,我们一般一个group只会订阅一个topic.

但是细心的小伙伴可能发现了java代码里  这里订阅的是一个集合,那就说明是可以订阅多个的。

consumer.subscribe(Collections.singletonList(topic));      

 什么情况需要多个呢?

比如 王者荣耀里的举报挂机发送 对局id到 topic->wzry   lol里的举报挂机发送topic->lol。

那么后台工作人员真的就是分两个部门吗?因为检测挂机,我只需要看下对局id,回顾下对局是不是挂机,随便看两下就知道了,所以只需要一个部门就行。

这个部门就consumer.subscribe(Lists.newArrayList("wzry","lol"));然后检查每一条数据即可。

又有杠精小伙伴要说了为啥要两个topic 直接搞一个topic guaji 不就好了?

比如说年底了要检查下wzry和lol这个月举报量多少,那个违规高 那你一个topic怎么搞还要过滤啥的。说的有点多了。

此时我们起了3个topic 让我们看下数据是不是真的被分别消费了

kafkaConsumer的分区分配策略-实测说话

produce发送了100条消息。

kafkaConsumer的分区分配策略-实测说话

可以看到consumer1 只消费了parititon0

consumer2 只消费了parition1

consumer3只消费了partition2

看着一切都很美好。。。 此时又要抛出一个问题。前面提到这里又分配策略,那我们的分配策略是啥?

去服务器上通过命令可以看到,但是又要上网查命令,又要登服务器。。

RangeAssignor

所以直接在日志里看。class org.apache.kafka.clients.consumer.RangeAssignor

kafkaConsumer的分区分配策略-实测说话

 直接打开类看下

kafkaConsumer的分区分配策略-实测说话
/**
 * <p>The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
 * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
 * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
 * divide, then the first few consumers will have one extra partition.
 *
 * <p>For example, suppose there are two consumers <code>C0</code> and <code>C1</code>, two topics <code>t0</code> and
 * <code>t1</code>, and each topic has 3 partitions, resulting in partitions <code>t0p0</code>, <code>t0p1</code>,
 * <code>t0p2</code>, <code>t1p0</code>, <code>t1p1</code>, and <code>t1p2</code>.
 *
 * <p>The assignment will be:
 * <ul>
 * <li><code>C0: [t0p0, t0p1, t1p0, t1p1]</code></li>
 * <li><code>C1: [t0p2, t1p2]</code></li>
 * </ul>
 */
           

这个说明简直就是菜鸡福音,开始蹩脚翻译

range分配 基于每个topic。对于每个topic,我们按照数字顺序排列可用分区以及按词典顺序排列的消费者。然后,我们将分区数除以使用者确定要分配给每个使用者的分区数。如果不均匀划分,则前几个消费者将多一个额外的分区。

两个消费者 c0  c1 

两个topic  t0 t1都有3个分区 ->t0p0 t0p1 t0p2  t1p0 t2p1 t2p2

最后是

c0消费 t0p0, t0p1, t1p0, t1p1

c1消费 t0p2, t1p2

按照上面的说明我们来总结下。

总结下

1.我们首先把 所有topic的partition 都列出来t0p0 t0p1 t0p2  t1p0 t2p1 t2p2 .....

2.基于topic划分 基于topic划分  基于topic划分  

        先看t0  t0p0 t0p1 t0p2 ,这个要分配到 c0 c1

        那么 c0 会获取到  t0p0 t0p1 ,c1获取到  t0p2 

        接着 c0 会获取到  t1p0 t1p1 ,c1获取到  t1p2 

注意此时聪明的小伙伴,应该想到我最开始的测试,在启动consumer2的时候不就是3个分区分配两个消费者 ,当时consumer1 就是消费的p0 p1 ,consumer2消费p2 与此时一模一样。

最后还是要总结下。

rangeAssigner是基于topic划分的 topic划分,这是啥意思?

例如 我 c1 c2 都是在一个group里。但是

c1 subscribe t1 t2 

c2 subscrbe t2 t3 

t1 t2 t3 都是3分区

此时如何分配,原则还是基于topic划分,

先看t1 被c1 c2 消费 那么  c1 消费 t1p0 t1p1  c2消费 t1p2

再看t2 被c1 消费 那么 c1 消费 t2p0 t2p1 t2p2

最后t3 被c2 消费 那么 c2 消费 t3p0 t3p1 t3p2

还是那句话分配都是基于topic来分配的。 我不管其余的topic怎么分配,但是我这个topic的分配不均,那么前面的消费者会被多分配

———————————————————————————————————————————

RoundRobin

就不想做测试了。。。直接看代码,有时候不想看源码,可也没办法

/**
 * <p>The round robin assignor lays out all the available partitions and all the available consumers. It
 * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer
 * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
 * will be within a delta of exactly one across all consumers.)
 *
 * <p>For example, suppose there are two consumers <code>C0</code> and <code>C1</code>, two topics <code>t0</code> and <code>t1</code>, and each topic has 3 partitions,
 * resulting in partitions <code>t0p0</code>, <code>t0p1</code>, <code>t0p2</code>, <code>t1p0</code>, <code>t1p1</code>, and <code>t1p2</code>.
 *
 * <p>The assignment will be:
 * <ul>
 * <li><code>C0: [t0p0, t0p2, t1p1]</code>
 * <li><code>C1: [t0p1, t1p0, t1p2]</code>
 * </ul>
 *
 * <p>When subscriptions differ across consumer instances, the assignment process still considers each
 * consumer instance in round robin fashion but skips over an instance if it is not subscribed to
 * the topic. Unlike the case when subscriptions are identical, this can result in imbalanced
 * assignments. For example, we have three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>, and three topics <code>t0</code>, <code>t1</code>, <code>t2</code>,
 * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are <code>t0p0</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>,
 * <code>t2p1</code>, <code>t2p2</code>. <code>C0</code> is subscribed to <code>t0</code>; <code>C1</code> is subscribed to <code>t0</code>, <code>t1</code>; and <code>C2</code> is subscribed to <code>t0</code>, <code>t1</code>, <code>t2</code>.
 *
 * <p>That assignment will be:
 * <ul>
 * <li><code>C0: [t0p0]</code>
 * <li><code>C1: [t1p0]</code>
 * <li><code>C2: [t1p1, t2p0, t2p1, t2p2]</code>
 * </ul>
 */
           

roundRobin 将所有可用分区和消费者全部列出来

然后开始将分区到循环分配到消费者。如果所有消费者的订阅的topic 相同,则分区将均匀分布。

两个消费者 c0  c1 

两个topic  t0 t1都有3个分区 ->t0p0 t0p1 t0p2  t1p0 t2p1 t2p2  

最后是

c0消费 t0p0, t0p2, t1p1

c1消费 t0p1, t1p0, t1p2

注意这里和range的区别!!!!!

range是以topic来看的!!! !【t0p0 t0p1 t0p2 】【 t1p0 t2p1 t2p2 】 

round Robin是以所有parition来看的!!!!【t0p0 t0p1 t0p2  t1p0 t2p1 t2p2  】

所以round Robin和 range 对于两个消费者消费2个topic 都有3个分区 结果两者结果有出入

roundRobin第二种情况

当每个消费订阅的topic不相同的时候,仍然会考虑到每个消费者, 对于没有订阅这个topic的将会在轮询时直接跳过

这是什么意思呢? c1订阅了t1 c2订阅了t1 t2 那么t2分区的分配就不关c1的事了?

不是的。只是说分配的时候我没订阅topic肯定不能消费,但是轮询的时候还是要按照轮询规则算我

官方例子

消费者 c0 c1 c2 

topic t0 t1 t2  分区数是123, 那么就是 t0p0 t1p0 t1p1 t2p0 t2p1 t2p2

c0 订阅 t0 , c1订阅t0 t1 ,c2订阅 t0 t1 t2

最后roundRobin的结果是

C0: [t0p0]

C1: [t1p0]

C2: [t1p1, t2p0, t2p1, t2p2]

这个是怎么分配的。注意轮询 轮询,注意点在哪里?是【轮】!!!

这里应该说分配了几轮?答案应该是4轮

测试下官方案例  round Robin。

kafkaConsumer的分区分配策略-实测说话
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");      

c0->t0-0

kafkaConsumer的分区分配策略-实测说话

 c1->t1-0

kafkaConsumer的分区分配策略-实测说话

 c2->  t1-1, t2-2, t2-1, t2-0

kafkaConsumer的分区分配策略-实测说话

其实这里我想了下如果是range呢? 其实也还是这个结果 我也测试了。结果太长有点懒得截图

kafkaConsumer的分区分配策略-实测说话

C0: [t0p0]

C1: [t1p0]

C2: [t1p1, t2p0, t2p1, t2p2]

其实官方例子举的不是特别好 因为c2 消费了t2 那么t2只可能被他消费 肯定不均匀。

所以官网举得例子也很垃圾。

其实这里还是有个问题,我觉得很多人都没有说过,或者根本就不知道!!!!!!!

就是消费者的启动顺序不同,最后消费的partition 也不同的!!!!!比如我c2先启动,那么c2肯定会把topic0的p0消费,当c1启动后c2还会把这个partition让出么?我测试了不会的。

round_robin借用别人的例子

消费者 c0 c1

topic t0 t1 t2  分区数都是3 , 那么就是 t0p0 t1p0 t0p1  t1p0 t1p1 t1p2 t2p0 t2p1 t2p2

c0 订阅 t0 t1, c1订阅t1,t2

先说想法 9个分区,其中c0 c1 各有单独的t0 t2topic肯定被他们各自消费,关键就是t1这个分区怎么分配。

直接搞案例。。。不写文字游戏了。

kafkaConsumer的分区分配策略-实测说话
kafkaConsumer的分区分配策略-实测说话

 consumer1 -> topic1-1, topic0-0, topic0-2, topic0-1

kafkaConsumer的分区分配策略-实测说话

consumer2->topic2-2, topic2-1, topic1-0, topic1-2, topic2-0

kafkaConsumer的分区分配策略-实测说话

这个可跟我引用的那篇文章不一样啊... 所以谁对谁错??我也迷茫了。。

有大佬看到希望可以解惑 谁对谁错?

我猜测是我消费者启动不对?但是这个先起,后起 无非就是 4 5 变 5 4 个分区

而不是像下图的3 6 分区分配...... 只能说各位大佬写文章的时候难道直接根据源码开撸?一路分析都不用实践的么?暂且留下这个疑问。

kafkaConsumer的分区分配策略-实测说话

2022-05-23 这几天一直想着这个问题,不解决心里难受。经过实测+研读kafka源码解决这个问题了。

到底怎么分配的?

消费者 c0 c1

topic t0 t1 t2  分区数都是3 , 那么就是 t0p0 t1p0 t0p1  t1p0 t1p1 t1p2 t2p0 t2p1 t2p2

c0 订阅 t0 t1, c1订阅t1,t2

kafkaConsumer的分区分配策略-实测说话

 还是要撸源码

CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));      
  for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
            final String topic = partition.topic();
            while (!subscriptions.get(assigner.peek()).topics().contains(topic))
                assigner.next();
            assignment.get(assigner.next()).add(partition);
        }
           

assigner 是一个可以看作是消费者的集合 . 这b有两个方法 一个peek  一个next。看他类名就是知道是个圆。

kafkaConsumer的分区分配策略-实测说话

peek是啥就是拿出当前指针的数据。

next就是指针下滑一个,但是返回当前指针的位置的数据。

allPartitionsSorted(partitionsPerTopic, subscriptions) 分区顺序排序【t0p0 t1p0 t0p1  t1p0 t1p1 t1p2 t2p0 t2p1 t2p2】

那么开始分析。

 final String topic = partition.topic();//获取这个分区是哪个topic 的easy

            while (!subscriptions.get(assigner.peek()).topics().contains(topic))

                assigner.next();

            assignment.get(assigner.next()).add(partition);

subscriptions.get(assigner.peek()).topics().contains(topic) 这个是拿取当前指针的消费者,看他是不是消费这个topic.

如果消费者 没有消费这个分区 那么就来一个     assigner.next(); 指针下移。

算了直接来案例实验。c0 订阅 t0 t1, c1订阅t1,t2

对10个分区循环。

t0p0 ,assigner.peek()=c0   c0订阅t0 while=false   assigner.next返回 c0 ,但是指针已经到c1了

c0 [t0p0] 

t0p1  assigner.peek()=c1  c1不订阅 t0,while=true  assigner.next 指针下移到c0,assginer.next返回c0 下移到c1

c0[t0p0,t0p1]

t0p2 assigner.peek=c1 同上 指针到c1

c0[t0p0,t0p1,t0p2]

t1p0 

assigner.peek=c1 c1订阅t1 while=false   assigner.next返回c1 但是指针到c0了

c0[t0p0,t0p1,t0p2],c1[t1p0]

t1p1

assigner.peek=c0 订阅了t1 while=false   assigner.next返回c0 但是指针到c1了

c0[t0p0,t0p1,t0p2,t1p1],c1[t1p0]

t1p2

assigner.peek=c1 订阅了t1 while=false   assigner.next返回c1 但是指针到c0了

c0[t0p0,t0p1,t0p2,t1p1],c1[t1p0,t1p2]

后面的就不用说了t2 只被c1消费

所以最后结果

c0[t0p0,t0p1,t0p2,t1p1]      c1[t1p0,t1p2,t2p0,t2p1,t2p2] 。

谁是对的? 谁是对的? 我是对的啊!!!实践是检验真理的唯一标准。 一个个图画的不错有啥用啊。

StickyAssignor

其实有很多文章都只写了 range 和round Roubin,这时我就想问了,他们怎么知道这两种?

为什么没有第三种 第四种?是看过官网还是看过源码。我估计大多数人都没看过。

/**
 * <p>The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either:
 * <ul>
 * <li>the numbers of topic partitions assigned to consumers differ by at most one; or</li>
 * <li>each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it.</li>
 * </ul>
 * Second, it preserved as many existing assignment as possible when a reassignment occurs. This helps in saving some of the
 * overhead processing when topic partitions move from one consumer to another.</p>
 *
 * <p>Starting fresh it would work by distributing the partitions over consumers as evenly as possible. Even though this may sound similar to
 * how round robin assignor works, the second example below shows that it is not.
 * During a reassignment it would perform the reassignment in such a way that in the new assignment
 * <ol>
 * <li>topic partitions are still distributed as evenly as possible, and</li>
 * <li>topic partitions stay with their previously assigned consumers as much as possible.</li>
 * </ol>
 * Of course, the first goal above takes precedence over the second one.</p>
 *
 * <p><b>Example 1.</b> Suppose there are three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>,
 * four topics <code>t0,</code> <code>t1</code>, <code>t2</code>, <code>t3</code>, and each topic has 2 partitions,
 * resulting in partitions <code>t0p0</code>, <code>t0p1</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>,
 * <code>t2p1</code>, <code>t3p0</code>, <code>t3p1</code>. Each consumer is subscribed to all three topics.
 *
 * The assignment with both sticky and round robin assignors will be:
 * <ul>
 * <li><code>C0: [t0p0, t1p1, t3p0]</code></li>
 * <li><code>C1: [t0p1, t2p0, t3p1]</code></li>
 * <li><code>C2: [t1p0, t2p1]</code></li>
 * </ul>
 *
 * Now, let's assume <code>C1</code> is removed and a reassignment is about to happen. The round robin assignor would produce:
 * <ul>
 * <li><code>C0: [t0p0, t1p0, t2p0, t3p0]</code></li>
 * <li><code>C2: [t0p1, t1p1, t2p1, t3p1]</code></li>
 * </ul>
 *
 * while the sticky assignor would result in:
 * <ul>
 * <li><code>C0 [t0p0, t1p1, t3p0, t2p0]</code></li>
 * <li><code>C2 [t1p0, t2p1, t0p1, t3p1]</code></li>
 * </ul>
 * preserving all the previous assignments (unlike the round robin assignor).
 *</p>
 * <p><b>Example 2.</b> There are three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>,
 * and three topics <code>t0</code>, <code>t1</code>, <code>t2</code>, with 1, 2, and 3 partitions respectively.
 * Therefore, the partitions are <code>t0p0</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>,
 * <code>t2p1</code>, <code>t2p2</code>. <code>C0</code> is subscribed to <code>t0</code>; <code>C1</code> is subscribed to
 * <code>t0</code>, <code>t1</code>; and <code>C2</code> is subscribed to <code>t0</code>, <code>t1</code>, <code>t2</code>.
 *
 * The round robin assignor would come up with the following assignment:
 * <ul>
 * <li><code>C0 [t0p0]</code></li>
 * <li><code>C1 [t1p0]</code></li>
 * <li><code>C2 [t1p1, t2p0, t2p1, t2p2]</code></li>
 * </ul>
 *
 * which is not as balanced as the assignment suggested by sticky assignor:
 * <ul>
 * <li><code>C0 [t0p0]</code></li>
 * <li><code>C1 [t1p0, t1p1]</code></li>
 * <li><code>C2 [t2p0, t2p1, t2p2]</code></li>
 * </ul>
 *
 * Now, if consumer <code>C0</code> is removed, these two assignors would produce the following assignments.
 * Round Robin (preserves 3 partition assignments):
 * <ul>
 * <li><code>C1 [t0p0, t1p1]</code></li>
 * <li><code>C2 [t1p0, t2p0, t2p1, t2p2]</code></li>
 * </ul>
 *
 * Sticky (preserves 5 partition assignments):
 * <ul>
 * <li><code>C1 [t1p0, t1p1, t0p0]</code></li>
 * <li><code>C2 [t2p0, t2p1, t2p2]</code></li>
 * </ul>
 *</p>
 * <h3>Impact on <code>ConsumerRebalanceListener</code></h3>
 * The sticky assignment strategy can provide some optimization to those consumers that have some partition cleanup code
 * in their <code>onPartitionsRevoked()</code> callback listeners. The cleanup code is placed in that callback listener
 * because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it
 * is using range or round robin assignor. The listener code would look like this:
 * <pre>
 * {@code
 * class TheOldRebalanceListener implements ConsumerRebalanceListener {
 *
 *   void onPartitionsRevoked(Collection<TopicPartition> partitions) {
 *     for (TopicPartition partition: partitions) {
 *       commitOffsets(partition);
 *       cleanupState(partition);
 *     }
 *   }
 *
 *   void onPartitionsAssigned(Collection<TopicPartition> partitions) {
 *     for (TopicPartition partition: partitions) {
 *       initializeState(partition);
 *       initializeOffset(partition);
 *     }
 *   }
 * }
 * }
 * </pre>
 *
 * As mentioned above, one advantage of the sticky assignor is that, in general, it reduces the number of partitions that
 * actually move from one consumer to another during a reassignment. Therefore, it allows consumers to do their cleanup
 * more efficiently. Of course, they still can perform the partition cleanup in the <code>onPartitionsRevoked()</code>
 * listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the
 * cleanup after the rebalance only on the partitions they have lost (which is normally not a lot). The code snippet below
 * clarifies this point:
 * <pre>
 * {@code
 * class TheNewRebalanceListener implements ConsumerRebalanceListener {
 *   Collection<TopicPartition> lastAssignment = Collections.emptyList();
 *
 *   void onPartitionsRevoked(Collection<TopicPartition> partitions) {
 *     for (TopicPartition partition: partitions)
 *       commitOffsets(partition);
 *   }
 *
 *   void onPartitionsAssigned(Collection<TopicPartition> assignment) {
 *     for (TopicPartition partition: difference(lastAssignment, assignment))
 *       cleanupState(partition);
 *
 *     for (TopicPartition partition: difference(assignment, lastAssignment))
 *       initializeState(partition);
 *
 *     for (TopicPartition partition: assignment)
 *       initializeOffset(partition);
 *
 *     this.lastAssignment = assignment;
 *   }
 * }
 * }
 * </pre>
 *
 * Any consumer that uses sticky assignment can leverage this listener like this:
 * <code>consumer.subscribe(topics, new TheNewRebalanceListener());</code>
 *
 */
           

直接开始。。这个是新出来的一种分配,主要目的就是为了避免分配不均。

Example 1

三个consumer  c0 c1 c2 订阅了如下四个分区

四个topic  t0 t1 t2 t3  每个都有2个分区。 t0p0 t0p1 t1p0 t1p1 t2p0 t2p1 t3p0 t3p1 

roundRobin和stcky都是这么分配的

C0: [t0p0, t1p1, t3p0]

C1: [t0p1, t2p0, t3p1]

C2: [t1p0, t2p1]

此时我又要插一句 如果是range呢? 我猜测的。懒得验证了。

C0: [t0p0,t1p0, t2p0, t3p0]

C1: [t0p1,t1p1,t2p1, t3p1]

C2: [ ]

这个时候 突然c1没了!!那么会重新分配 怎么分配呢?

roundRobin

C0: [t0p0, t1p0, t2p0, t3p0]

C2: [t0p1, t1p1, t2p1, t3p1]

sticky

C0 [t0p0, t1p1, t3p0, t2p0]

C2 [t1p0, t2p1, t0p1, t3p1]

看着一模一样。。实际还不是。 注意t1p0 和t1p1 区别在哪里?

roundRobin是什么

我不管你之前的分区是什么,我把这8个分区重新分配

sticky是什么 

preserving all the previous assignments (unlike the round robin assignor).

保留之前的分配,然后把多余的C1留下来的分区 再开始重新分配。

其实看源码知道 range 和roundRobin 就那么几行。。 这个sticky贼长。

Example 2

消费者 c0 c1 c2 

topic t0 t1 t2  分区数是123, 那么就是 t0p0 t1p0 t1p1 t2p0 t2p1 t2p2

c0 订阅 t0 , c1订阅t0 t1 ,c2订阅 t0 t1 t2

roundRoubin:

C0 [t0p0]

C1 [t1p0]

C2 [t1p1, t2p0, t2p1, t2p2]

Sticky

C0 [t0p0]

C1 [t1p0, t1p1]

C2 [t2p0, t2p1, t2p2]

对呀 考虑到分布平均的情况,stciky一看就好多了。roundRoubin最开始看这个例子的时候就感觉分配的怪怪的。

此时c0突然没了

roundRoubin   preserves 3 partition assignments 和之前对比保留了3个分区

C1 [t0p0, t1p1]

C2 [t1p0, t2p0, t2p1, t2p2]

其实在这里这个分配透露了一个信息。按照example1所说,roundRoubin每次分配会从头开始,那么这里的 t0p0 t1p0 t1p1 t2p0 t2p1 t2p2 6个分区轮询

第一轮 c1 [t0p0]  c2[t1p0]  

第二轮 c1 [t0p0,t1p1]  c2[t1p0,t2p0]  

第三轮 c1 [t0p0,t1p1]  c2[t1p0,t2p0,t2p1]  

第四轮 c1 [t0p0,t1p1]  c2[t1p0,t2p0,t2p1,t2p2]  

那么此时要解开之前疑问了?按照此逻辑到底我对还是别人对? 毫无疑问是他对了。。但是我测试了多次我也没错呀。未解之谜。

sticky  preserves 5 partition assignments 和之前对比保留了5个分区

C1 [t1p0, t1p1, t0p0]

C2 [t2p0, t2p1, t2p2]

其实到这里差不多就明白了。sticky好在哪里? 如果有consumer挂掉了。他不是重新分配,而是将这个conumer的分区再分配,这样有个什么好处?说个不好听的就是随着consumer的挂掉,其余的consumer会分配的越来越平均。

例如以前 1 1 1 1 1 1 10

挂掉一个1 ->2 1 1 1 1 1 10 

再挂掉一个1 -> 2 2 1 1 1 10 

再挂掉一个2-> 2 2 2 1 10

挂掉一个10 -> 4 4 4 5

只是我的假设,这个类设计的初衷就是为了尽可能的平均分配。。

至此就搞完了。。还留下了一个未解之谜。。附上代码把 有兴趣的小伙伴可以自行测试,然后告诉我是不是我测错了。。

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.1</version>
        </dependency>
           
package com.chenchi.learning.kafka;

import com.google.gson.Gson;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Before;
import org.junit.Test;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;


public class kafkaTestTemplate {

    Properties props = new Properties();
    AdminClient adminClient;
    Producer producer;
    KafkaConsumer<String, String> consumer;
    private String topic = "cc_test";//test-ia-label
    private String group = "cc";


    @Before
    public void buildProperties() {
        props.put("bootstrap.servers", "xxxx.xxx:9092");
        props.put("acks", "all");
        props.put("delivery.timeout.ms", 30000);
        props.put("batch.size", 16384);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<String, String>(props);
        adminClient = AdminClient.create(props);

        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", group);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        //props.put("auto.commit.interval.ms", "1000");
        consumer = new KafkaConsumer<>(props);
    }

    @Test
    public void kafkaProducer() throws InterruptedException {
        //沈阳阿拉丁数字科技有限公司
        //String names="阿里,腾讯,百度,上海大鹅文化传播有限公司,上海天蝎电器有限公司,东莞市叙亚服饰有限公司,东莞皂救美商贸有限公司";
        //有社保
        String names = "一九一一文化传播,YG文化经纪,TCL移动通信科技,OPPO广东移动通信有限公司,乐视,阿里,腾讯,百度,上海大鹅文化传播有限公司,上海天蝎电器有限公司,东莞市叙亚服饰有限公司,东莞皂救美商贸有限公司";
        names = "由,于,现,在,新,中,台,h,i,v,e,这,边,已,经,在,每,天,同,步,下,列,l,i,n,k,数,据";
        List<String> nameList = Arrays.asList(names.split(","));
        for (int i = 0; i < nameList.size(); i++) {
            Map map = new HashMap<String, String>();
            map.put("name", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"));
            Gson gson = new Gson();
            String jsonStr = gson.toJson(map);

            try {
                producer.send(new ProducerRecord<String, String>(topic, jsonStr)).get();//test-ia-label
            } catch (Exception e) {
                e.printStackTrace();
            }
            Thread.sleep(2000);
        }
        producer.flush();
        producer.close();
    }

    @Test
    public void kafkaConsumer() {
        // 订阅test1 topic
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            //  从服务器开始拉取数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
                System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
                        record.offset(), record.key(), record.value());
            });
        }
    }

    //beginOffset={test-ia-label-2=105, test-ia-label-0=8371, test-ia-label-1=85}
//endOffsets={test-ia-label-2=3212, test-ia-label-0=11567, test-ia-label-1=3259}
    @Test
    public void getTopicOffset() {
        // 订阅test1 topic
        consumer.subscribe(Collections.singletonList(topic));
        Set<TopicPartition> assignment = consumer.assignment();
        if (assignment.isEmpty()) {
            //这里的目的是 此时只是订阅了topic ,并不知道server上 topic的具体信息,需要连接一下
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
                        record.offset(), record.key(), record.value());
            }
            assignment = consumer.assignment();
        }
        for (TopicPartition topicPartition : assignment) {
            System.out.println("topic的分区" + topicPartition);
        }
        consumer.seekToBeginning(assignment);
        Map<TopicPartition, Long> partitionAndOffset = consumer.beginningOffsets(assignment);
        System.out.println("beginOffset=" + partitionAndOffset);//获取的是kafka目前存在的数据的 最开始offset
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
        System.out.println("endOffsets=" + endOffsets); //这里是指最新的offset+1
        //如何获取当前group 上次消费的offset呢?

    }

    //获取消费group的offset
    @Test
    public void gets() throws ExecutionException, InterruptedException {
        Properties properties = new Properties() {{
            this.put("bootstrap.servers", "9.135.68.201:9092");
        }};
        AdminClient adminClient = AdminClient.create(properties);
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Collection<TopicListing> topicListings = listTopicsResult.listings().get();
        System.out.println("topics="+topicListings);
        ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("ContractRiskLabelJob-1");
        KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> mapKafkaFuture = result.partitionsToOffsetAndMetadata();
        Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = mapKafkaFuture.get();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            OffsetAndMetadata offsetAndMetadata = entry.getValue();
            long offset = offsetAndMetadata.offset();
            String metadata = offsetAndMetadata.metadata();
            System.out.println("topicPartition=" + topicPartition + ",offset=" + offset);
        }


    }

    @Test
    public void createTopic() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        System.out.println(listTopicsResult.names().get());
        CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(new NewTopic("cc_test", 2, (short) 2)));
//        ArrayList<NewTopic> topics = new ArrayList<>();
//        topics.add(new NewTopic("cc_test2",3, (short) 3));
//        adminClient.createTopics(topics);
        topics.all().whenComplete((s, b) -> {
            if (b == null) {
//                    this.describeTopic(adminClient, topic);
                System.out.println("创建成功, s=" + s + ",b=" + b);
            } else {
                System.out.println("创建失败,  s=" + s + ",b=" + b);
//                    this.describeTopic(adminClient, topic);

            }
        });
        adminClient.close();

    }

    //获取topic的replicas 和 isr
    @Test
    public void describeTopic() throws InterruptedException {

        DescribeTopicsResult result = adminClient.describeTopics(Collections.singleton(topic));

        result.values().forEach((key, value) -> {
            try {
                System.out.println(key + ":" + value.get());
                TopicDescription topicDescription = value.get();
                List<TopicPartitionInfo> partitions = topicDescription.partitions();
                for (TopicPartitionInfo info : partitions) {
                    System.out.println(info);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        Thread.sleep(10000);
    }

}