原創,允許轉載,我的目的就是給大家節省時間
先說一下為什麼手動維護offset,因為環境問題,目前讀的是kafka0.8版本,推動更新比較吃力
手動維護offset的好處,你可以記錄每個時間點的offset,如果上遊日志異常,你可以把你記錄的offset和時間戳拿出來,找出對應時間點的offset,去修複曆史資料
不廢話,寫過spark的,看了我的代碼就知道如何實作了,這個是
FlinkKafkaConsumer08是連結flink的類,咱們重點看
KeyedDeserializationSchema,這個是擷取kafka的partitions喝offset的,實作了他就可以了,直接貼代碼了兄弟們,不想碼字了
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
private static final long serialVersionUID = -6272159445203409112L;
public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
private final Properties kafkaProperties;
public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
this(Collections.singletonList(topic), valueDeserializer, props);
}
public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this((List)topics, (KeyedDeserializationSchema)(new KeyedDeserializationSchemaWrapper(deserializer)), props);
}
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
this(topics, (Pattern)null, deserializer, props);
}
@PublicEvolving
public FlinkKafkaConsumer08(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this((Pattern)subscriptionPattern, (KeyedDeserializationSchema)(new KeyedDeserializationSchemaWrapper(valueDeserializer)), props);
}
@PublicEvolving
public FlinkKafkaConsumer08(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
this((List)null, subscriptionPattern, deserializer, props);
}
private FlinkKafkaConsumer08(List<String> topics, Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(topics, subscriptionPattern, deserializer, PropertiesUtil.getLong((Properties)Preconditions.checkNotNull(props, "props"), "flink.partition-discovery.interval-millis", -9223372036854775808L), !PropertiesUtil.getBoolean(props, "flink.disable-metrics", false));
this.kafkaProperties = props;
validateZooKeeperConfig(props);
validateAutoOffsetResetValue(props);
}
protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
long autoCommitInterval = offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC ? PropertiesUtil.getLong(this.kafkaProperties, "auto.commit.interval.ms", 60000L) : -1L;
return new Kafka08Fetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext, this.deserializer, this.kafkaProperties, autoCommitInterval, consumerMetricGroup, useMetrics);
}
protected AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor topicsDescriptor, int indexOfThisSubtask, int numParallelSubtasks) {
return new Kafka08PartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, this.kafkaProperties);
}
protected boolean getIsAutoCommitEnabled() {
return PropertiesUtil.getBoolean(this.kafkaProperties, "auto.commit.enable", true) && PropertiesUtil.getLong(this.kafkaProperties, "auto.commit.interval.ms", 60000L) > 0L;
}
protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> partitions, long timestamp) {
throw new UnsupportedOperationException("Fetching partition offsets using timestamps is only supported in Kafka versions 0.10 and above.");
}
protected static void validateZooKeeperConfig(Properties props) {
if (props.getProperty("zookeeper.connect") == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
} else if (props.getProperty("group.id") == null) {
throw new IllegalArgumentException("Required property 'group.id' has not been set in the properties");
} else {
try {
Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
} catch (NumberFormatException var3) {
throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
}
try {
Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
} catch (NumberFormatException var2) {
throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
}
}
}
private static void validateAutoOffsetResetValue(Properties config) {
String val = config.getProperty("auto.offset.reset", "largest");
if (!val.equals("largest") && !val.equals("latest") && !val.equals("earliest") && !val.equals("smallest")) {
throw new IllegalArgumentException("Cannot use 'auto.offset.reset' value '" + val + "'. Possible values: 'latest', 'largest', 'earliest', or 'smallest'.");
}
}
}
下面開始直接上代碼,這樣就可以了:
import java.text.SimpleDateFormat
import java.util
import com.alibaba.fastjson.{JSON, JSONObject}
import java.util.{Date, Properties}
import java.util.Date
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.state.FunctionSnapshotContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.types.Row
object FlinkDemo {
case class KafkaMsg(key: String, value: String, topic: String, partiton: Int, offset: Long)
class TypedKeyedDeserializationSchema extends KeyedDeserializationSchema[KafkaMsg] {
def deserialize(key: Array[Byte],
value: Array[Byte],
topic: String,
partition: Int,
offset: Long
): KafkaMsg =
KafkaMsg(new String(key),
new String(value),
topic,
partition,
offset
)
def isEndOfStream(e: KafkaMsg): Boolean = false
def getProducedType(): TypeInformation[KafkaMsg] = createTypeInformation
}
def main(args: Array[String]): Unit = {
//System.setProperties()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
//kafka位置 老版本的 kafka是配置zookeeper位址
properties.setProperty("bootstrap.servers","localhost:9092")
properties.setProperty("zookeeper.connect","localhost:2181")
val topic = "click"
properties.setProperty("group.id", "test-flink")
val consumer = new FlinkKafkaConsumer08(topic,new TypedKeyedDeserializationSchema(),properties)
val text = env.addSource(consumer).print()
env.execute()
}
}
記得關注我的微信公衆号,後邊有實戰的東西和源碼都會在上面更新,減少你們走彎路的時間
我微信二維碼可以加一下也行