HDFS和Yarn同时重启对Flink on Yarn任务的影响

现象

部分consumer的topic partition出现从Earlist开始消费的问题

官网上

1
2
3
4
5
6
7
8
1. If offsets could not be found for a partition, the auto.offset.reset setting in the properties will be used.

2. Flink Kafka Consumer Offset提交行为配置:
Flink Kafka Consumer允许配置offset提交回Kafka brokers(Kafka 0.8是写回Zookeeper)的行为,注意Flink Kafka Consumer 并不依赖于这个提交的offset来进行容错性保证,这个提交的offset仅仅作为监控consumer处理进度的一种手段。

配置offset提交行为的方式有多种,主要取决于Job的checkpoint机制是否启动。
  1)checkpoint禁用:如果checkpoint禁用,Flink Kafka Consumer依赖于Kafka 客户端内部的自动周期性offset提交能力。因此,为了启用或者禁用offset提交,仅需在给定的Properties配置中设置enable.auto.commit(Kafka 0.8是auto.commit.enable)/auto.commit.interval.ms为适当的值即可。
  2)checkpoint启用:如果checkpoint启用,当checkpoint完成之后,Flink Kafka Consumer将会提交offset保存到checkpoint State中,这就保证了kafka broker中的committed offset与 checkpoint stata中的offset相一致。用户可以在Consumer中调用setCommitOffsetsOnCheckpoints(boolean) 方法来选择启用或者禁用offset committing(默认情况下是启用的)。注意,在这种情况下,配置在Properties中的自动周期性offset提交将会被完全忽略。

分析

flink消费kafka的topic,为保证容错性,对offset的管理是通常是基于checkpoint机制的。

关于checkpoint存储offset机制可以参考这篇文章中文可参考

而checkpoint状态保存在HDFS上,当HDFS重启时,checkpoint状态存在保存失败的问题,当yarn重启后,yarn会自动将flink任务重启,重启时从checkpoint开始恢复,但是存在故障的checkpoint,导致上述问题(If offsets could not be found for a partition, the auto.offset.reset setting in the properties will be used)。

源码分析

flink-connector-kafka目前已有kafka 0.8、0.9、0.10、0.11四个版本的实现,本文分析的是FlinkKafkaConsumer011版本代码。

FlinkKafkaConsumer011类的父类继承关系如下,FlinkKafkaConsumerBase包含了大多数实现。

FlinkKafkaConsumer011 extends FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase

1
2
3
4
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
ResultTypeQueryable<T>,
CheckpointedFunction {

FlinkKafkaConsumerBase的内部实现分析:

  1. initializeState方法会在flinkkafkaconusmer初始化的时候最先调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
方法通过运行时上下文FunctionSnapshotContext调用getOperatorStateStore和getSerializableListState拿到了checkpoint里面的state对象
如果这个task是从失败等过程中恢复的,context.isRestored()会被判定为true
程序会试图从flink checkpoint里获取原来分配到的kafka partition以及最后提交完成的offset。

@Override
public final void initializeState(FunctionInitializationContext context) throws Exception {

OperatorStateStore stateStore = context.getOperatorStateStore();

ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));

if (context.isRestored() && !restoredFromOldState) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

// migrate from 1.2 state, if there is any
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
restoredFromOldState = true;
unionOffsetStates.add(kafkaOffset);
}
oldRoundRobinListState.clear();

if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
throw new IllegalArgumentException(
"Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
}

// populate actual holder for restored state
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}

LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
} else {
LOG.info("No restore state for FlinkKafkaConsumer.");
}
}
  1. open方法会在initializeState技术后调用,主要逻辑分为几个步骤
  • 判断offsetCommitMode。根据kafka的auto commit ,setCommitOffsetsOnCheckpoints()的值(默认为true)以及flink运行时有没有开启checkpoint三个参数的组合,offsetCommitMode共有三种模式:
    • ON_CHECKPOINTS checkpoint结束后提交offset;
    • KAFKA_PERIODIC kafkaconsumer自带的定期提交功能;
    • DISABLED 不提交
  • 创建分区发现者
  • 判断是否从checkpoint状态恢复,若是,则从状态中读取各partition的offset;若否,则根据启动模式来设定offset
    • SPECIFIC_OFFSETS 和 TIMESTAMP 两个模式直接设置好
    • 其他的模式(EARLIEST, LATEST 和 GROUP_OFFSETS),会在后面真正读partition数据时设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@Override
public void open(Configuration configuration) throws Exception {
// determine the offset commit mode(判断offsetCommitMode)
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

// create the partition discoverer(创建分区发现者)
this.partitionDiscoverer = createPartitionDiscoverer(
topicsDescriptor,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
this.partitionDiscoverer.open();

subscribedPartitionsToStartOffsets = new HashMap<>();

List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();

//判断是否从checkpoint状态恢复
if (restoredState != null) { // 是
for (KafkaTopicPartition partition : allPartitions) { // 从状态中恢复
if (!restoredState.containsKey(partition)) { //状态中没有当前分区,则从ERALIST开始消费
restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}

for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
if (!restoredFromOldState) {
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
if (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()){
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
} else {
// when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
// in this case, just use the restored state as the subscribed partitions
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
}

LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
} else { // 否
// use the partition discoverer to fetch the initial seed partitions,
// and set their initial offsets depending on the startup mode.
// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
// when the partition is actually read.
switch (startupMode) { //启动模式
case SPECIFIC_OFFSETS: //指定offset开始消费
if (specificStartupOffsets == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
", but no specific offsets were specified.");
}

for (KafkaTopicPartition seedPartition : allPartitions) {
Long specificOffset = specificStartupOffsets.get(seedPartition);
if (specificOffset != null) {
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
} else {
// default to group offset behaviour if the user-provided specific offsets
// do not contain a value for this partition
subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
}
}

break;
case TIMESTAMP: //指定produce时间戳开始消费
if (startupOffsetsTimestamp == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
", but no startup timestamp was specified.");
}

for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
: fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
subscribedPartitionsToStartOffsets.put(
partitionToOffset.getKey(),
(partitionToOffset.getValue() == null)
// if an offset cannot be retrieved for a partition with the given timestamp,
// we default to using the latest offset for the partition
? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
: partitionToOffset.getValue() - 1);
}

break;
default:
for (KafkaTopicPartition seedPartition : allPartitions) {
subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
}
}

if (!subscribedPartitionsToStartOffsets.isEmpty()) {
switch (startupMode) {
case EARLIEST:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
case LATEST:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
case TIMESTAMP:
LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
startupOffsetsTimestamp,
subscribedPartitionsToStartOffsets.keySet());
break;
case SPECIFIC_OFFSETS:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
specificStartupOffsets,
subscribedPartitionsToStartOffsets.keySet());

List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
}
}

if (partitionsDefaultedToGroupOffsets.size() > 0) {
LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
getRuntimeContext().getIndexOfThisSubtask(),
partitionsDefaultedToGroupOffsets.size(),
partitionsDefaultedToGroupOffsets);
}
break;
default:
case GROUP_OFFSETS:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
}
} else {
LOG.info("Consumer subtask {} initially has no partitions to read from.",
getRuntimeContext().getIndexOfThisSubtask());
}
}
}
© 2019 GuoYL's Notes All Rights Reserved. 本站访客数人次 本站总访问量
Theme by hiero