天天看点

Flink kafka source & sink 源码解析

Flink kafka source & sink 源码解析

原创 吴鹏 Flink 中文社区 4天前

摘要:本文基于 Flink 1.9.0 和 Kafka 2.3 版本,对 Flink Kafka source 和 sink 端的源码进行解析,主要内容分为以下两部分:

1.Flink-kafka-source 源码解析

  • 流程概述
  • 非 checkpoint 模式 offset 的提交
  • checkpoint 模式下 offset 的提交
  • 指定 offset 消费

2.Flink-kafka-sink 源码解析

  • 初始化
  • Task运行
  • 小结

一般在 Flink 中创建 kafka source 的代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//KafkaEventSchema为自定义的数据字段解析类
env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)           

而 Kafka 的 KafkaConsumer API 中消费某个 topic 使用的是 poll 方法如下:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.poll(Duration.ofMillis(100));           

下面将分析这两个流程是如何衔接起来的。

初始化执行 env.addSource 的时候会创建 StreamSource 对象,即 final StreamSource sourceOperator = new StreamSource<>(function);这里的function 就是传入的 FlinkKafkaConsumer 对象,StreamSource 构造函数中将这个对象传给父类 AbstractUdfStreamOperator 的 userFunction 变量,源码如下:

■ StreamSource.java

public StreamSource(SRC sourceFunction) {
    super(sourceFunction);
    this.chainingStrategy = ChainingStrategy.HEAD;
}           

■ AbstractUdfStreamOperator.java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}           

task 启动后会调用到 SourceStreamTask 中的 performDefaultAction() 方法,这里面会启动一个线程 sourceThread.start();,部分源码如下:

private final LegacySourceFunctionThread sourceThread;

@Override
protected void performDefaultAction(ActionContext context) throws Exception {
    sourceThread.start();
}           

在 LegacySourceFunctionThread 的 run 方法中,通过调用 headOperator.run 方法,最终调用了 StreamSource 中的 run 方法,部分源码如下:

public void run(final Object lockingObject,
                final StreamStatusMaintainer streamStatusMaintainer,
                final Output<StreamRecord<OUT>> collector,
                final OperatorChain<?, ?> operatorChain) throws Exception {

  //省略部分代码
  this.ctx = StreamSourceContexts.getSourceContext(
    timeCharacteristic,
    getProcessingTimeService(),
    lockingObject,
    streamStatusMaintainer,
    collector,
    watermarkInterval,
    -1);

  try {
    userFunction.run(ctx);
    //省略部分代码
  } finally {
    // make sure that the context is closed in any case
    ctx.close();
    if (latencyEmitter != null) {
      latencyEmitter.close();
    }
  }
}           

这里最重要的就是 userFunction.run(ctx);,这个 userFunction 就是在上面初始化的时候传入的 FlinkKafkaConsumer 对象,也就是说这里实际调用了 FlinkKafkaConsumer 中的 run 方法,而具体的方法实现在其父类 FlinkKafkaConsumerBase中,至此,进入了真正的 kafka 消费阶段。

Kafka消费阶段

在 FlinkKafkaConsumerBase#run 中创建了一个 KafkaFetcher 对象,并最终调用了 kafkaFetcher.runFetchLoop(),这个方法的代码片段如下:

/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
private final KafkaConsumerThread consumerThread;

@Override
public void runFetchLoop() throws Exception {
  try {
    final Handover handover = this.handover;

    // kick off the actual Kafka consumer
    consumerThread.start();
    
    //省略部分代码
}           

可以看到实际启动了一个 KafkaConsumerThread 线程。进入到 KafkaConsumerThread#run 中,下面只是列出了这个方法的部分源码,完整代码请参考 KafkaConsumerThread.java。

@Override
public void run() {
  // early exit check
  if (!running) {
    return;
  }
  // This method initializes the KafkaConsumer and guarantees it is torn down properly.
  // This is important, because the consumer has multi-threading issues,
  // including concurrent 'close()' calls.
  try {
    this.consumer = getConsumer(kafkaProperties);
  } catch (Throwable t) {
    handover.reportError(t);
    return;
  }
  try {

    // main fetch loop
    while (running) {
      try {
        if (records == null) {
          try {
            records = consumer.poll(pollTimeout);
          } catch (WakeupException we) {
            continue;
          }
        }
      }
      // end main fetch loop
    }
  } catch (Throwable t) {
    handover.reportError(t);
  } finally {
    handover.close();
    try {
      consumer.close();
    } catch (Throwable t) {
      log.warn("Error while closing Kafka consumer", t);
    }
  }
}           

至此,终于走到了真正从 kafka 拿数据的代码,即 records = consumer.poll(pollTimeout);。因为 KafkaConsumer 不是线程安全的,所以每个线程都需要生成独立的 KafkaConsumer 对象,即 this.consumer = getConsumer(kafkaProperties);。

KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {
  return new KafkaConsumer<>(kafkaProperties);
}           
小结:本节只是介绍了 Flink 消费 kafka 数据的关键流程,下面会更详细的介绍在AT_LEAST_ONCE和EXACTLY_ONCE 不同场景下 FlinkKafkaConsumer 管理 offset 的流程。

消费 kafka topic 最为重要的部分就是对 offset 的管理,对于 kafka 提交 offset 的机制,可以参考 kafka 官方网。

而在 flink kafka source 中 offset 的提交模式有3种:

public enum OffsetCommitMode {

   /** Completely disable offset committing. */
   DISABLED,

   /** Commit offsets back to Kafka only when checkpoints are completed. */
   ON_CHECKPOINTS,

   /** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
   KAFKA_PERIODIC;
}           

初始化 offsetCommitMode

在 FlinkKafkaConsumerBase#open 方法中初始化 offsetCommitMode:

// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
        ((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());           
  • 方法 getIsAutoCommitEnabled() 的实现如下:
protected boolean getIsAutoCommitEnabled() {
   return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
      PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}           
  • 也就是说只有 enable.auto.commit=true 并且 auto.commit.interval.ms>0 这个方法才会返回 true
  • 变量 enableCommitOnCheckpoints 默认是 true,可以调用 setCommitOffsetsOnCheckpoints 改变这个值
  • 当代码中调用了 env.enableCheckpointing 方法,isCheckpointingEnabled 才会返回 true

通过下面的代码返回真正的提交模式:

/**
 * Determine the offset commit mode using several configuration values.
 *
 * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
 * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
 * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
 *
 * @return the offset commit mode to use, based on the configuration values.
 */
public static OffsetCommitMode fromConfiguration(
      boolean enableAutoCommit,
      boolean enableCommitOnCheckpoint,
      boolean enableCheckpointing) {

   if (enableCheckpointing) {
      // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
      return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
   } else {
      // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
      return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
   }
}           

暂时不考虑 checkpoint 的场景,所以只考虑 (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;。

也就是如果客户端设置了 enable.auto.commit=true 那么就是 KAFKA_PERIODIC,否则就是 KAFKA_DISABLED。

offset 的提交

■ 自动提交

这种方式完全依靠 kafka 自身的特性进行提交,如下方式指定参数即可:

Properties properties = new Properties();
properties.put("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)           

■ 非自动提交

通过上面的分析,如果 enable.auto.commit=false,那么 offsetCommitMode 就是 DISABLED 。

kafka 官方文档中,提到当 enable.auto.commit=false 时候需要手动提交 offset,也就是需要调用 consumer.commitSync(); 方法提交。

但是在 flink 中,非 checkpoint 模式下,不会调用 consumer.commitSync();, 一旦关闭自动提交,意味着 kafka 不知道当前的 consumer group 每次消费到了哪。

可以从两方面证实这个问题:

  • 源码

    KafkaConsumerThread#run 方法中是有 consumer.commitSync();,但是只有当 commitOffsetsAndCallback != null 的时候才会调用。只有开启了checkpoint 功能才会不为 null,这个变量会在后续的文章中详细分析。

  • 测试

    可以通过消费 __consumer_offsets 观察是否有 offset 的提交

重启程序,还是会重复消费之前消费过的数据

小结:本节介绍了在非 checkpoint 模式下,Flink kafka source 提交 offset 的方式,下文会重点介绍 checkpoint 模式下提交 offset 的流程。

上面介绍了在没有开启 checkpoint 的时候,offset 的提交方式,下面将重点介绍开启 checkpoint 后,Flink kafka consumer 提交 offset 的方式。

通过上文可以知道,当调用了 env.enableCheckpointing 方法后 offsetCommitMode 的值就是 ON_CHECKPOINTS,而且会通过下面方法强制关闭 kafka 自动提交功能,这个值很重要,后续很多地方都是根据这个值去判断如何操作的。

/**
 * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
 * This overwrites whatever setting the user configured in the properties.
 * @param properties - Kafka configuration properties to be adjusted
 * @param offsetCommitMode offset commit mode
 */
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
   if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
      properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
   }
}           

保存 offset

在做 checkpoint 的时候会调用 FlinkKafkaConsumerBase#snapshotState 方法,其中 pendingOffsetsToCommit 会保存要提交的 offset。

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
   // the map cannot be asynchronously updated, because only one checkpoint call can happen
   // on this function at a time: either snapshotState() or notifyCheckpointComplete()
   pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}           

同时,下面的变量会作为 checkpoint 的一部分保存下来,以便恢复时使用。

/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

在 snapshotState 方法中会同时保存 offset:

for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}           

提交 offset

在 checkpoint 完成以后,task 会调用 notifyCheckpointComplete 方法,里面判断 offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS 的时候,调用fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback); 方法,最终会将要提交的 offset 通过 KafkaFetcher#doCommitInternalOffsetsToKafka 方法中的 consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback); 保存到 KafkaConsumerThread.java 中的 nextOffsetsToCommit 成员变量里面。

这样就会保证当有需要提交的 offset 的时候,下面代码会执行 consumer.commitAsync,从而完成了手动提交 offset 到 kafka。

final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);

if (commitOffsetsAndCallback != null) {
  log.debug("Sending async offset commit request to Kafka broker");

  // also record that a commit is already in progress
  // the order here matters! first set the flag, then send the commit command.
  commitInProgress = true;
  consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}           
小结:本节介绍了在 checkpoint 模式下,Flink kafka source 提交 offset 的方式,后续会介绍 consumer 读取 offset 的流程。

消费模式

在 Flink 的 kafka source 中有以下5种模式指定 offset 消费:

public enum StartupMode {

   /** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
   GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),

   /** Start from the earliest offset possible. */
   EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),

   /** Start from the latest offset. */
   LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),

   /**
    * Start from user-supplied timestamp for each partition.
    * Since this mode will have specific offsets to start with, we do not need a sentinel value;
    * using Long.MIN_VALUE as a placeholder.
    */
   TIMESTAMP(Long.MIN_VALUE),

   /**
    * Start from user-supplied specific offsets for each partition.
    * Since this mode will have specific offsets to start with, we do not need a sentinel value;
    * using Long.MIN_VALUE as a placeholder.
    */
   SPECIFIC_OFFSETS(Long.MIN_VALUE);
}           

默认为 GROUP_OFFSETS,表示根据上一次 group id 提交的 offset 位置开始消费。每个枚举的值其实是一个 long 型的负数,根据不同的模式,在每个 partition 初始化的时候会默认将 offset 设置为这个负数。其他的方式和 kafka 本身的语义类似,就不在赘述。

指定 offset

此处只讨论默认的 GROUP_OFFSETS 方式,下文所有分析都是基于这种模式。但是还是需要区分是否开启了 checkpoint。在开始分析之前需要对几个重要的变量进行说明:

  • subscribedPartitionsToStartOffsets
    • 所属类:FlinkKafkaConsumerBase.java
    • 定义:
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToSt           

说明:保存订阅 topic 的所有 partition 以及初始消费的 offset。

  • subscribedPartitionStates
    • 所属类:AbstractFetcher.java
/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<KPH>> subscribedPar           

说明:保存了所有订阅的 partition 的 offset 等详细信息,例如:

/** The offset within the Kafka partition that we already processed. */
private volatile long offset;
/** The offset of the Kafka partition that has been committed. */
private volatile long committedOffset;           

每次消费完数据之后都会更新这些值,这个变量非常的重要,在做 checkpoint 的时候,保存的 offset 等信息都是来自于这个变量。这个变量的初始化如下:

// initialize subscribed partition states with seed partitions
this.subscribedPartitionStates = createPartitionStateHolders(
  seedPartitionsWithInitialOffsets,
  timestampWatermarkMode,
  watermarksPeriodic,
  watermarksPunctuated,
  userCodeClassLoader);           

消费之后更新相应的 offset 主要在 KafkaFetcher#runFetchLoop

方法 while 循环中调用 emitRecord(value, partition, record.

offset(), record);。

  • restoredState
/**
     * The offsets to restore to, if the consumer restores state from a checkpoint.
     *
     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
     *
     * <p>Using a sorted map as the ordering is important when using restored state
     * to seed the partition discoverer.
     */
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;           

说明:如果指定了恢复的 checkpoint 路径,启动时候将会读取这个变量里面的内容获取起始 offset,而不再是使用 StartupMode 中的枚举值作为初始的 offset。

  • unionOffsetStates
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;           

说明:保存了 checkpoint 要持久化存储的内容,例如每个 partition 已经消费的 offset 等信息

■ 非 checkpoint 模式

在没有开启 checkpoint 的时候,消费 kafka 中的数据,其实就是完全依靠 kafka 自身的机制进行消费。

■ checkpoint 模式

开启 checkpoint 模式以后,会将 offset 等信息持久化存储以便恢复时使用。但是作业重启以后如果由于某种原因读不到 checkpoint 的结果,例如 checkpoint 文件丢失或者没有指定恢复路径等。

  • 第一种情况,如果读取不到 checkpoint 的内容

subscribedPartitionsToStartOffsets 会初始化所有 partition 的起始 offset为 -915623761773L 这个值就表示了当前为 GROUP_OFFSETS 模式。

default:
   for (KafkaTopicPartition seedPartition : allPartitions) {
      subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
   }           

第一次消费之前,指定读取 offset 位置的关键方法是 KafkaConsumerThread#reassignPartitions 代码片段如下:

for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
  if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
    consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
    consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
    // the KafkaConsumer by default will automatically seek the consumer position
    // to the committed group offset, so we do not need to do it.
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else {
    consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
  }
}           

因为是 GROUP_OFFSET 模式 ,所以会调用 newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); 需要说明的是,在 state 里面需要存储的是成功消费的最后一条数据的 offset,但是通过 position 这个方法返回的是下一次应该消费的起始 offset,所以需要减1。这里更新这个的目的是为了 checkpoint 的时候可以正确的拿到 offset。

这种情况由于读取不到上次 checkpoint 的结果,所以依旧是依靠 kafka 自身的机制,即根据__consumer_offsets 记录的内容消费。

  • 第二种情况,checkpoint 可以读取到

这种情况下, subscribedPartitionsToStartOffsets 初始的 offset 就是具体从checkpoint 中恢复的内容,这样 KafkaConsumerThread#reassignPartitions 实际走的分支就是:

consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);           

这里加1的原理同上,state 保存的是最后一次成功消费数据的 offset,所以加1才是现在需要开始消费的 offset。

小结:本节介绍了程序启动时,如何确定从哪个 offset 开始消费,下文会继续分析 flink kafka sink 的相关源码。

通常添加一个 kafka sink 的代码如下:

input.addSink(
   new FlinkKafkaProducer<>(
      "bar",
      new KafkaSerializationSchemaImpl(),
         properties,
      FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");           

初始化执行 env.addSink 的时候会创建 StreamSink 对象,即 StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction));这里的 sinkFunction 就是传入的 FlinkKafkaProducer 对象,StreamSink 构造函数中将这个对象传给父类 AbstractUdfStreamOperator 的 userFunction 变量,源码如下:

■ StreamSink.java

public StreamSink(SinkFunction<IN> sinkFunction) {
  super(sinkFunction);
  chainingStrategy = ChainingStrategy.ALWAYS;
}           

■ AbstractUdfStreamOperator.java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}           

Task 运行

StreamSink 会调用下面的方法发送数据:

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
   sinkContext.element = element;
   userFunction.invoke(element.getValue(), sinkContext);
}
``

也就是实际调用的是 FlinkKafkaProducer#invoke 方法。在 FlinkKafkaProducer 的构造函数中需要指 FlinkKafkaProducer.Semantic,即:
           

public enum Semantic {

EXACTLY_ONCE,

AT_LEAST_ONCE,

NONE

}

下面就基于3种语义分别说一下总体的向 kafka 发送数据的流程。

**■ Semantic.NONE**

这种方式不会做任何额外的操作,完全依靠 kafka producer 自身的特性,也就是FlinkKafkaProducer#invoke 里面发送数据之后,Flink 不会再考虑 kafka 是否已经正确的收到数据。
           

transaction.producer.send(record, callback);

**■ Semantic.AT_LEAST_ONCE**

这种语义下,除了会走上面说到的发送数据的流程外,如果开启了 checkpoint 功能,在 FlinkKafkaProducer#snapshotState 中会首先执行父类的 snapshotState方法,里面最终会执行 FlinkKafkaProducer#preCommit。
           

@Override

protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {

switch (semantic) {

case EXACTLY_ONCE:
  case AT_LEAST_ONCE:
     flush(transaction);
     break;
  case NONE:
     break;
  default:
     throw new UnsupportedOperationException("Not implemented semantic");           

checkErroneous();

AT_LEAST_ONCE 会执行了 flush 方法,里面执行了:
           

transaction.producer.flush();

就是将 send 的数据立即发送给 kafka 服务端,详细含义可以参考 KafkaProducer api:http://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

> flush()  
> Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.  


**■ Semantic.EXACTLY_ONCE**

EXACTLY_ONCE 语义也会执行 send 和 flush 方法,但是同时会开启 kafka producer 的事务机制。FlinkKafkaProducer 中 beginTransaction 的源码如下,可以看到只有是 EXACTLY_ONCE 模式才会真正开始一个事务。
           

protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {

case EXACTLY_ONCE:
     FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
     producer.beginTransaction();
     return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
  case AT_LEAST_ONCE:
  case NONE:
     // Do not create new producer on each beginTransaction() if it is not necessary
     final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
     if (currentTransaction != null && currentTransaction.producer != null) {
        return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
     }
     return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
  default:
     throw new UnsupportedOperationException("Not implemented semantic");           
和 AT_LEAST_ONCE 另一个不同的地方在于 checkpoint 的时候,会将事务相关信息保存到变量 nextTransactionalIdHintState 中,这个变量存储的信息会作为 checkpoint 中的一部分进行持久化。
           

if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {

checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");

long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;

// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that

// case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this

// scaling up.

if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {

nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;           

nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(

getRuntimeContext().getNumberOfParallelSubtasks(),
  nextFreeTransactionalId));           
> **小结:**本节介绍了 Flink Kafka Producer 的基本实现原理,后续会详细介绍 Flink 在结合 kafka 的时候如何做到端到端的 Exactly Once 语义的。  


**作者介绍:**

吴鹏,亚信科技资深工程师,Apache Flink Contributor。先后就职于中兴,IBM,华为。目前在亚信科技负责实时流处理引擎产品的研发。