Flink kafka source & sink 源码解析
原创 吴鹏 Flink 中文社区 4天前
摘要:本文基于 Flink 1.9.0 和 Kafka 2.3 版本,对 Flink Kafka source 和 sink 端的源码进行解析,主要内容分为以下两部分:
1.Flink-kafka-source 源码解析
- 流程概述
- 非 checkpoint 模式 offset 的提交
- checkpoint 模式下 offset 的提交
- 指定 offset 消费
2.Flink-kafka-sink 源码解析
- 初始化
- Task运行
- 小结
一般在 Flink 中创建 kafka source 的代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//KafkaEventSchema为自定义的数据字段解析类
env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
而 Kafka 的 KafkaConsumer API 中消费某个 topic 使用的是 poll 方法如下:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.poll(Duration.ofMillis(100));
下面将分析这两个流程是如何衔接起来的。
初始化执行 env.addSource 的时候会创建 StreamSource 对象,即 final StreamSource sourceOperator = new StreamSource<>(function);这里的function 就是传入的 FlinkKafkaConsumer 对象,StreamSource 构造函数中将这个对象传给父类 AbstractUdfStreamOperator 的 userFunction 变量,源码如下:
■ StreamSource.java
public StreamSource(SRC sourceFunction) {
super(sourceFunction);
this.chainingStrategy = ChainingStrategy.HEAD;
}
■ AbstractUdfStreamOperator.java
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
task 启动后会调用到 SourceStreamTask 中的 performDefaultAction() 方法,这里面会启动一个线程 sourceThread.start();,部分源码如下:
private final LegacySourceFunctionThread sourceThread;
@Override
protected void performDefaultAction(ActionContext context) throws Exception {
sourceThread.start();
}
在 LegacySourceFunctionThread 的 run 方法中,通过调用 headOperator.run 方法,最终调用了 StreamSource 中的 run 方法,部分源码如下:
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector,
final OperatorChain<?, ?> operatorChain) throws Exception {
//省略部分代码
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);
try {
userFunction.run(ctx);
//省略部分代码
} finally {
// make sure that the context is closed in any case
ctx.close();
if (latencyEmitter != null) {
latencyEmitter.close();
}
}
}
这里最重要的就是 userFunction.run(ctx);,这个 userFunction 就是在上面初始化的时候传入的 FlinkKafkaConsumer 对象,也就是说这里实际调用了 FlinkKafkaConsumer 中的 run 方法,而具体的方法实现在其父类 FlinkKafkaConsumerBase中,至此,进入了真正的 kafka 消费阶段。
Kafka消费阶段
在 FlinkKafkaConsumerBase#run 中创建了一个 KafkaFetcher 对象,并最终调用了 kafkaFetcher.runFetchLoop(),这个方法的代码片段如下:
/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
private final KafkaConsumerThread consumerThread;
@Override
public void runFetchLoop() throws Exception {
try {
final Handover handover = this.handover;
// kick off the actual Kafka consumer
consumerThread.start();
//省略部分代码
}
可以看到实际启动了一个 KafkaConsumerThread 线程。进入到 KafkaConsumerThread#run 中,下面只是列出了这个方法的部分源码,完整代码请参考 KafkaConsumerThread.java。
@Override
public void run() {
// early exit check
if (!running) {
return;
}
// This method initializes the KafkaConsumer and guarantees it is torn down properly.
// This is important, because the consumer has multi-threading issues,
// including concurrent 'close()' calls.
try {
this.consumer = getConsumer(kafkaProperties);
} catch (Throwable t) {
handover.reportError(t);
return;
}
try {
// main fetch loop
while (running) {
try {
if (records == null) {
try {
records = consumer.poll(pollTimeout);
} catch (WakeupException we) {
continue;
}
}
}
// end main fetch loop
}
} catch (Throwable t) {
handover.reportError(t);
} finally {
handover.close();
try {
consumer.close();
} catch (Throwable t) {
log.warn("Error while closing Kafka consumer", t);
}
}
}
至此,终于走到了真正从 kafka 拿数据的代码,即 records = consumer.poll(pollTimeout);。因为 KafkaConsumer 不是线程安全的,所以每个线程都需要生成独立的 KafkaConsumer 对象,即 this.consumer = getConsumer(kafkaProperties);。
KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {
return new KafkaConsumer<>(kafkaProperties);
}
小结:本节只是介绍了 Flink 消费 kafka 数据的关键流程,下面会更详细的介绍在AT_LEAST_ONCE和EXACTLY_ONCE 不同场景下 FlinkKafkaConsumer 管理 offset 的流程。
消费 kafka topic 最为重要的部分就是对 offset 的管理,对于 kafka 提交 offset 的机制,可以参考 kafka 官方网。
而在 flink kafka source 中 offset 的提交模式有3种:
public enum OffsetCommitMode {
/** Completely disable offset committing. */
DISABLED,
/** Commit offsets back to Kafka only when checkpoints are completed. */
ON_CHECKPOINTS,
/** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
KAFKA_PERIODIC;
}
初始化 offsetCommitMode
在 FlinkKafkaConsumerBase#open 方法中初始化 offsetCommitMode:
// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());
- 方法 getIsAutoCommitEnabled() 的实现如下:
protected boolean getIsAutoCommitEnabled() {
return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}
- 也就是说只有 enable.auto.commit=true 并且 auto.commit.interval.ms>0 这个方法才会返回 true
- 变量 enableCommitOnCheckpoints 默认是 true,可以调用 setCommitOffsetsOnCheckpoints 改变这个值
- 当代码中调用了 env.enableCheckpointing 方法,isCheckpointingEnabled 才会返回 true
通过下面的代码返回真正的提交模式:
/**
* Determine the offset commit mode using several configuration values.
*
* @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
* @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
* @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
*
* @return the offset commit mode to use, based on the configuration values.
*/
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}
暂时不考虑 checkpoint 的场景,所以只考虑 (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;。
也就是如果客户端设置了 enable.auto.commit=true 那么就是 KAFKA_PERIODIC,否则就是 KAFKA_DISABLED。
offset 的提交
■ 自动提交
这种方式完全依靠 kafka 自身的特性进行提交,如下方式指定参数即可:
Properties properties = new Properties();
properties.put("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
■ 非自动提交
通过上面的分析,如果 enable.auto.commit=false,那么 offsetCommitMode 就是 DISABLED 。
kafka 官方文档中,提到当 enable.auto.commit=false 时候需要手动提交 offset,也就是需要调用 consumer.commitSync(); 方法提交。
但是在 flink 中,非 checkpoint 模式下,不会调用 consumer.commitSync();, 一旦关闭自动提交,意味着 kafka 不知道当前的 consumer group 每次消费到了哪。
可以从两方面证实这个问题:
-
源码
KafkaConsumerThread#run 方法中是有 consumer.commitSync();,但是只有当 commitOffsetsAndCallback != null 的时候才会调用。只有开启了checkpoint 功能才会不为 null,这个变量会在后续的文章中详细分析。
-
测试
可以通过消费 __consumer_offsets 观察是否有 offset 的提交
重启程序,还是会重复消费之前消费过的数据
小结:本节介绍了在非 checkpoint 模式下,Flink kafka source 提交 offset 的方式,下文会重点介绍 checkpoint 模式下提交 offset 的流程。
上面介绍了在没有开启 checkpoint 的时候,offset 的提交方式,下面将重点介绍开启 checkpoint 后,Flink kafka consumer 提交 offset 的方式。
通过上文可以知道,当调用了 env.enableCheckpointing 方法后 offsetCommitMode 的值就是 ON_CHECKPOINTS,而且会通过下面方法强制关闭 kafka 自动提交功能,这个值很重要,后续很多地方都是根据这个值去判断如何操作的。
/**
* Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
* This overwrites whatever setting the user configured in the properties.
* @param properties - Kafka configuration properties to be adjusted
* @param offsetCommitMode offset commit mode
*/
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
}
保存 offset
在做 checkpoint 的时候会调用 FlinkKafkaConsumerBase#snapshotState 方法,其中 pendingOffsetsToCommit 会保存要提交的 offset。
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
同时,下面的变量会作为 checkpoint 的一部分保存下来,以便恢复时使用。
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
在 snapshotState 方法中会同时保存 offset:
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
提交 offset
在 checkpoint 完成以后,task 会调用 notifyCheckpointComplete 方法,里面判断 offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS 的时候,调用fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback); 方法,最终会将要提交的 offset 通过 KafkaFetcher#doCommitInternalOffsetsToKafka 方法中的 consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback); 保存到 KafkaConsumerThread.java 中的 nextOffsetsToCommit 成员变量里面。
这样就会保证当有需要提交的 offset 的时候,下面代码会执行 consumer.commitAsync,从而完成了手动提交 offset 到 kafka。
final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);
if (commitOffsetsAndCallback != null) {
log.debug("Sending async offset commit request to Kafka broker");
// also record that a commit is already in progress
// the order here matters! first set the flag, then send the commit command.
commitInProgress = true;
consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}
小结:本节介绍了在 checkpoint 模式下,Flink kafka source 提交 offset 的方式,后续会介绍 consumer 读取 offset 的流程。
消费模式
在 Flink 的 kafka source 中有以下5种模式指定 offset 消费:
public enum StartupMode {
/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
/** Start from the earliest offset possible. */
EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
/** Start from the latest offset. */
LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
/**
* Start from user-supplied timestamp for each partition.
* Since this mode will have specific offsets to start with, we do not need a sentinel value;
* using Long.MIN_VALUE as a placeholder.
*/
TIMESTAMP(Long.MIN_VALUE),
/**
* Start from user-supplied specific offsets for each partition.
* Since this mode will have specific offsets to start with, we do not need a sentinel value;
* using Long.MIN_VALUE as a placeholder.
*/
SPECIFIC_OFFSETS(Long.MIN_VALUE);
}
默认为 GROUP_OFFSETS,表示根据上一次 group id 提交的 offset 位置开始消费。每个枚举的值其实是一个 long 型的负数,根据不同的模式,在每个 partition 初始化的时候会默认将 offset 设置为这个负数。其他的方式和 kafka 本身的语义类似,就不在赘述。
指定 offset
此处只讨论默认的 GROUP_OFFSETS 方式,下文所有分析都是基于这种模式。但是还是需要区分是否开启了 checkpoint。在开始分析之前需要对几个重要的变量进行说明:
- subscribedPartitionsToStartOffsets
- 所属类:FlinkKafkaConsumerBase.java
- 定义:
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToSt
说明:保存订阅 topic 的所有 partition 以及初始消费的 offset。
- subscribedPartitionStates
- 所属类:AbstractFetcher.java
/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<KPH>> subscribedPar
说明:保存了所有订阅的 partition 的 offset 等详细信息,例如:
/** The offset within the Kafka partition that we already processed. */
private volatile long offset;
/** The offset of the Kafka partition that has been committed. */
private volatile long committedOffset;
每次消费完数据之后都会更新这些值,这个变量非常的重要,在做 checkpoint 的时候,保存的 offset 等信息都是来自于这个变量。这个变量的初始化如下:
// initialize subscribed partition states with seed partitions
this.subscribedPartitionStates = createPartitionStateHolders(
seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
watermarksPeriodic,
watermarksPunctuated,
userCodeClassLoader);
消费之后更新相应的 offset 主要在 KafkaFetcher#runFetchLoop
方法 while 循环中调用 emitRecord(value, partition, record.
offset(), record);。
- restoredState
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
*
* <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
*
* <p>Using a sorted map as the ordering is important when using restored state
* to seed the partition discoverer.
*/
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
说明:如果指定了恢复的 checkpoint 路径,启动时候将会读取这个变量里面的内容获取起始 offset,而不再是使用 StartupMode 中的枚举值作为初始的 offset。
- unionOffsetStates
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
说明:保存了 checkpoint 要持久化存储的内容,例如每个 partition 已经消费的 offset 等信息
■ 非 checkpoint 模式
在没有开启 checkpoint 的时候,消费 kafka 中的数据,其实就是完全依靠 kafka 自身的机制进行消费。
■ checkpoint 模式
开启 checkpoint 模式以后,会将 offset 等信息持久化存储以便恢复时使用。但是作业重启以后如果由于某种原因读不到 checkpoint 的结果,例如 checkpoint 文件丢失或者没有指定恢复路径等。
- 第一种情况,如果读取不到 checkpoint 的内容
subscribedPartitionsToStartOffsets 会初始化所有 partition 的起始 offset为 -915623761773L 这个值就表示了当前为 GROUP_OFFSETS 模式。
default:
for (KafkaTopicPartition seedPartition : allPartitions) {
subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
}
第一次消费之前,指定读取 offset 位置的关键方法是 KafkaConsumerThread#reassignPartitions 代码片段如下:
for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
// the KafkaConsumer by default will automatically seek the consumer position
// to the committed group offset, so we do not need to do it.
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else {
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
}
}
因为是 GROUP_OFFSET 模式 ,所以会调用 newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); 需要说明的是,在 state 里面需要存储的是成功消费的最后一条数据的 offset,但是通过 position 这个方法返回的是下一次应该消费的起始 offset,所以需要减1。这里更新这个的目的是为了 checkpoint 的时候可以正确的拿到 offset。
这种情况由于读取不到上次 checkpoint 的结果,所以依旧是依靠 kafka 自身的机制,即根据__consumer_offsets 记录的内容消费。
- 第二种情况,checkpoint 可以读取到
这种情况下, subscribedPartitionsToStartOffsets 初始的 offset 就是具体从checkpoint 中恢复的内容,这样 KafkaConsumerThread#reassignPartitions 实际走的分支就是:
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
这里加1的原理同上,state 保存的是最后一次成功消费数据的 offset,所以加1才是现在需要开始消费的 offset。
小结:本节介绍了程序启动时,如何确定从哪个 offset 开始消费,下文会继续分析 flink kafka sink 的相关源码。
通常添加一个 kafka sink 的代码如下:
input.addSink(
new FlinkKafkaProducer<>(
"bar",
new KafkaSerializationSchemaImpl(),
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");
初始化执行 env.addSink 的时候会创建 StreamSink 对象,即 StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction));这里的 sinkFunction 就是传入的 FlinkKafkaProducer 对象,StreamSink 构造函数中将这个对象传给父类 AbstractUdfStreamOperator 的 userFunction 变量,源码如下:
■ StreamSink.java
public StreamSink(SinkFunction<IN> sinkFunction) {
super(sinkFunction);
chainingStrategy = ChainingStrategy.ALWAYS;
}
■ AbstractUdfStreamOperator.java
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
Task 运行
StreamSink 会调用下面的方法发送数据:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
sinkContext.element = element;
userFunction.invoke(element.getValue(), sinkContext);
}
``
也就是实际调用的是 FlinkKafkaProducer#invoke 方法。在 FlinkKafkaProducer 的构造函数中需要指 FlinkKafkaProducer.Semantic,即:
public enum Semantic {
EXACTLY_ONCE,
AT_LEAST_ONCE,
NONE
}
下面就基于3种语义分别说一下总体的向 kafka 发送数据的流程。
**■ Semantic.NONE**
这种方式不会做任何额外的操作,完全依靠 kafka producer 自身的特性,也就是FlinkKafkaProducer#invoke 里面发送数据之后,Flink 不会再考虑 kafka 是否已经正确的收到数据。
transaction.producer.send(record, callback);
**■ Semantic.AT_LEAST_ONCE**
这种语义下,除了会走上面说到的发送数据的流程外,如果开启了 checkpoint 功能,在 FlinkKafkaProducer#snapshotState 中会首先执行父类的 snapshotState方法,里面最终会执行 FlinkKafkaProducer#preCommit。
@Override
protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
flush(transaction);
break;
case NONE:
break;
default:
throw new UnsupportedOperationException("Not implemented semantic");
checkErroneous();
AT_LEAST_ONCE 会执行了 flush 方法,里面执行了:
transaction.producer.flush();
就是将 send 的数据立即发送给 kafka 服务端,详细含义可以参考 KafkaProducer api:http://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
> flush()
> Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.
**■ Semantic.EXACTLY_ONCE**
EXACTLY_ONCE 语义也会执行 send 和 flush 方法,但是同时会开启 kafka producer 的事务机制。FlinkKafkaProducer 中 beginTransaction 的源码如下,可以看到只有是 EXACTLY_ONCE 模式才会真正开始一个事务。
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
case EXACTLY_ONCE:
FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null && currentTransaction.producer != null) {
return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
}
return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
和 AT_LEAST_ONCE 另一个不同的地方在于 checkpoint 的时候,会将事务相关信息保存到变量 nextTransactionalIdHintState 中,这个变量存储的信息会作为 checkpoint 中的一部分进行持久化。
if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
// case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
// scaling up.
if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(
getRuntimeContext().getNumberOfParallelSubtasks(),
nextFreeTransactionalId));
> **小结:**本节介绍了 Flink Kafka Producer 的基本实现原理,后续会详细介绍 Flink 在结合 kafka 的时候如何做到端到端的 Exactly Once 语义的。
**作者介绍:**
吴鹏,亚信科技资深工程师,Apache Flink Contributor。先后就职于中兴,IBM,华为。目前在亚信科技负责实时流处理引擎产品的研发。