首页 热点资讯 义务教育 高等教育 出国留学 考研考公
您的当前位置:首页正文

flink使用checkpoint等知识

2024-12-19 来源:化拓教育网

开启

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000);
        env.setStateBackend(new FsStateBackend("hdfs://home/wangxiaotong/data/flink/checkpoints"));
        env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

上面调用enableExternalizedCheckpoints设置为ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理。上面代码配置了执行Checkpointing的时间间隔为1分钟。

保存多个Checkpoint

默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前。
Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:
state.checkpoints.num-retained: 20
在HDFS的相应文件夹下面会产生多个checkpoint文件。

从Checkpoint进行恢复

Flink Savepoint

Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpointing机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中。

Flink程序中包含两种状态数据,一种是用户定义的状态(User-defined State),他们是基于Flink的Transformation函数来创建或者修改得到的状态数据;另一种是系统状态(System State),他们是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录。为了能够在创建Savepoint过程中,唯一识别对应的Operator的状态数据,Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。当然,如果我们不指定Operator ID,Flink也会我们自动生成对应的Operator状态ID。
而且,强烈建议手动为每个Operator设置ID,即使未来Flink应用程序可能会改动很大,比如替换原来的Operator实现、增加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator状态对应上。另外,保存的Savepoint状态数据,毕竟是基于当时程序及其内存数据结构生成的,所以如果未来Flink程序改动比较大,尤其是对应的需要操作的内存数据结构都变化了,可能根本就无法从原来旧的Savepoint正确地恢复。

下面,我们以Flink官网文档中给定的例子,来看下如何设置Operator ID,代码如下所示:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

创建Savepoint

从Savepoint恢复

现在,我们可以停掉Job 40dcc6d2ba90f13930abce295de8d038,然后通过Savepoint命令来恢复Job运行,命令格式如下所示:
bin/flink run -s :savepointPath [:runArgs]
以上面保存的Savepoint为例,恢复Job运行,执行如下命令:

Savepoint目录结构

下面,我们看一下Savepoint目录下面存储内容的结构,如下所示:

hdfs dfs -ls /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b
Found 5 items
-rw-r--r--   3 hadoop supergroup       4935 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6
-rw-r--r--   3 hadoop supergroup       4599 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159
-rw-r--r--   3 hadoop supergroup       4976 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata
-rw-r--r--   3 hadoop supergroup       4348 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c
-rw-r--r--   3 hadoop supergroup       4724 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a

如上面列出的HDFS路径中,11bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串,然后savepoint-11bbc5-bd967f90709b作为存储此次Savepoint数据的根目录,最后savepoint-11bbc5-bd967f90709b目录下面_metadata文件包含了Savepoint的元数据信息,其中序列化包含了savepoint-11bbc5-bd967f90709b目录下面其它文件的路径,这些文件内容都是序列化的状态信息。

使用EventTime与WaterMark

WaterMark通过数据源或水印生成器插入到流中。
下面,我们通过实际编程实践,来说明一些需要遵守的基本原则,以便在开发中进行合理设置。
在开发Flink流数据处理程序时,需要指定Time Notion,Flink API提供了TimeCharacteristic枚举类,内部定义了3种Time Notion(参考上面说明)。设置Time Notion的示例代码,如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

上面,我们指定了基于TimeCharacteristic.EventTime来进行数据处理。如果我们没有显式指定TimeCharacteristic,默认使用TimeCharacteristic.ProcessTime。
基于EventTime的数据处理,需要对进入的数据元素指派时间戳,并且指定如何生成WaterMark,这样才能通过WaterMark来机制控制输入数据的完整性(事件到达),以便触发对指定Window进行计算。有两种方式实现时间戳指派和生成WaterMark:

  • 在Flink程序一开始调用assignTimestampsAndWatermarks()进行指派
  • 在Source Operator中直接指派
    下面,我们会基于这两种方式进行编码实现:

调用assignTimestampsAndWatermarks()进行指派

TimeWindow的大小设置为1分钟(60000ms),允许延迟到达时间设置为50秒(50000ms),并且为了模拟流数据元素事件时间早于当前处理系统的系统时间,设置延迟时间为2分钟(120000ms)。
我们自定义实现了一个用来模拟的Source Operator,代码如下所示:

class StringLineEventSource(val latenessMillis: Long) extends RichParallelSourceFunction[String] {
 
  val LOG = LoggerFactory.getLogger(classOf[StringLineEventSource])
  @volatile private var running = true
  val channelSet = Seq("a", "b", "c", "d")
  val behaviorTypes = Seq(
    "INSTALL", "OPEN", "BROWSE", "CLICK",
    "PURCHASE", "CLOSE", "UNINSTALL")
  val rand = Random
 
  override def run(ctx: SourceContext[String]): Unit = {
    val numElements = Long.MaxValue
    var count = 0L
 
    while (running && count < numElements) {
      val channel = channelSet(rand.nextInt(channelSet.size))
      val event = generateEvent()
      LOG.debug("Event: " + event)
      val ts = event(0)
      val id = event(1)
      val behaviorType = event(2)
      ctx.collect(Seq(ts, channel, id, behaviorType).mkString("\t"))
      count += 1
      TimeUnit.MILLISECONDS.sleep(5L)
    }
  }
 
  private def generateEvent(): Seq[String] = {
    // simulate 10 seconds lateness
    val ts = Instant.ofEpochMilli(System.currentTimeMillis)
      .minusMillis(latenessMillis)
      .toEpochMilli
 
    val id = UUID.randomUUID().toString
    val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
    // (ts, id, behaviorType)
    Seq(ts.toString, id, behaviorType)
  }
 
  override def cancel(): Unit = running = false
}

流数据中的数据元素为字符串记录行的格式,包含字段:事件时间、渠道、用户编号、用户行为类型。这里,我们直接调用SourceContext.collect()方法,将数据元素发送到下游进行处理。
在Flink程序中,通过调用stream: DataStream[T]的assignTimestampsAndWatermarks()进行时间戳的指派,并生成WaterMark。然后,基于Keyed Window生成Tumbling Window(不存在Window重叠)来操作数据记录。最后,将计算结果输出到Kafka中去。
对应的实现代码,如下所示:

def main(args: Array[String]): Unit = {
 
  val params = ParameterTool.fromArgs(args)
  checkParams(params)
  val sourceLatenessMillis = params.getRequired("source-lateness-millis").toLong
  maxLaggedTimeMillis = params.getLong("window-lagged-millis", DEFAULT_MAX_LAGGED_TIME)
  val windowSizeMillis = params.getRequired("window-size-millis").toLong
 
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 设置为TimeCharacteristic.EventTime
 
  val stream: DataStream[String] = env.addSource(new StringLineEventSource(sourceLatenessMillis))
 
  // create a Kafka producer for Kafka 0.9.x
  val kafkaProducer = new FlinkKafkaProducer09(
    params.getRequired("window-result-topic"),
    new SimpleStringSchema, params.getProperties
  )
 
  stream
    .setParallelism(1)
    .assignTimestampsAndWatermarks( // 指派时间戳,并生成WaterMark
       new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(maxLaggedTimeMillis)) {
         override def extractTimestamp(element: String): Long = {
           element.split("\t")(0).toLong
         }
       })
    .setParallelism(2)
    .map(line => {
      // ts, channel, id, behaviorType
      val a = line.split("\t")
      val channel = a(1)
      ((channel, a(3)), 1L)
    })
    .setParallelism(3)
    .keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSizeMillis))) // 使用Keyed Window
    .process(new EventTimeWindowReduceFunction())
    .setParallelism(4)
    .map(t => {
      val windowStart = t._1
      val windowEnd = t._2
      val channel = t._3
      val behaviorType = t._4
      val count = t._5
      Seq(windowStart, windowEnd, channel, behaviorType, count).mkString("\t")
    })
    .setParallelism(3)
    .addSink(kafkaProducer)
    .setParallelism(3)
 
  env.execute(getClass.getSimpleName)
}

上面,我们使用了Flink内建实现的BoundedOutOfOrdernessTimestampExtractor来指派时间戳和生成WaterMark。这里,我们实现了从事件记录中提取时间戳的逻辑,实际生成WaterMark的逻辑使用BoundedOutOfOrdernessTimestampExtractor提供的默认逻辑,在getCurrentWatermark()方法中。我们来看下BoundedOutOfOrdernessTimestampExtractor的实现,代码如下所示:

public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
 
    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;
    private long lastEmittedWatermark = Long.MIN_VALUE;
    private final long maxOutOfOrderness;
 
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; // 初始设置当前最大事件时间戳
    }
 
    public long getMaxOutOfOrdernessInMillis() {
        return maxOutOfOrderness;
    }
 
    public abstract long extractTimestamp(T element);
 
    @Override
    public final Watermark getCurrentWatermark() {
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness; // 当前最大事件时间戳,减去允许最大延迟到达时间
        if (potentialWM >= lastEmittedWatermark) { // 检查上一次emit的WaterMark时间戳,如果比lastEmittedWatermark大则更新其值
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }
 
    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) { // 检查新到达的数据元素的事件时间,用currentMaxTimestamp记录下当前最大的
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
}

可以看到,在getCurrentWatermark()和extractTimestamp()方法中,lastEmittedWatermark是WaterMark中的时间戳,计算它时,总是根据当前进入Flink处理系统的数据元素的最大的事件时间currentMaxTimestamp,然后再减去一个maxOutOfOrderness(外部配置的支持最大延迟到达的时间),也就说,这里面实现的WaterMark中的时间戳序列是非严格单调递增的。
我们实现的Flink程序为EventTimeTumblingWindowAnalytics,提交到Flink集群运行,执行如下命令:

bin/flink run --class org.shirdrn.flink.windowing.EventTimeTumblingWindowAnalytics flink-demo-assembly-0.0.1-SNAPSHOT.jar \
  --window-result-topic windowed-result-topic \
  --zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \
  --bootstrap.servers  \
  --source-lateness-millis 120000 \
  --window-lagged-millis 50000 \
  --window-size-millis 60000

在Source Operator中直接指派

和上面我们最终期望的逻辑基本保持一致,我们把指派时间戳和生成WaterMark的逻辑,提取出来放到Source Operator实现中,对应的关键代码片段,如下所示:

  var lastEmittedWatermark = Long.MinValue
  var currentMaxTimestamp = Long.MinValue + maxLaggedTimeMillis
... ...
      ctx.collectWithTimestamp(Seq(ts, channel, id, behaviorType).mkString("\t"), ts.toLong)
      ctx.emitWatermark(getCurrentWatermark(ts.toLong))
... ...
  private def getCurrentWatermark(ts: Long): Watermark = {
    if (ts > currentMaxTimestamp) {
      currentMaxTimestamp = ts
    }
    val watermarkTs = currentMaxTimestamp - maxLaggedTimeMillis
    if (watermarkTs >= lastEmittedWatermark) {
      lastEmittedWatermark = watermarkTs
    }
    new Watermark(lastEmittedWatermark)
  }

需要在Flink程序的main方法中,将外部配置的与WaterMark生成相关的参数值,传到Source Operator实现类中,如下所示:

val stream: DataStream[String] = env.addSource(
  new StringLineEventSourceWithTsAndWaterMark(sourceLatenessMillis, maxLaggedTimeMillis))

同时,把前面调用assignTimestampsAndWatermarks()的方法去掉即可。
编译后,提交到Flink集群运行,可以查看输出结果,和前面类似,输出结果正是我们所期望的。

Keyed Window与Non-Keyed Window

sate

keyed state.png keyed state 多种数据结构 operator state.png

note:Operator States的数据结构不像Keyed States丰富,现在只支持List

多种保存点
多种后端保存

用户可以根据自己的需求选择,如果数据量较小,可以存放到MemoryStateBackend和FsStateBackend中,如果数据量较大,可以放到RockDB中。

WindowFunction (Legacy)

窗口的处理函数,但是获取的contextual信息少,也没有一些先进的特征,比如per-window keyed state。未来将会被抛弃。

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

  /**
   * Evaluates the window and outputs none or several elements.
   *
   * @param key The key for which this window is evaluated.
   * @param window The window that is being evaluated.
   * @param input The elements in the window being evaluated.
   * @param out A collector for emitting elements.
   *
   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
   */
  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());

时间窗口的机制

Tumbling windows are aligned to epoch, i.e, 1970-01-01T00:00:00. So with a tumbling window of 10 seconds 2017-07-03T20:19:35Z would to into the window from [2017-07-03T20:19:30Z, 2017-07-03T20:19:40Z) and 2017-07-03T20:22:30Z would into [2017-07-03T20:22:30Z, 2017-07-03T20:22:40Z).
35的事件触发的窗口是[30-40)的窗口。

MapStateDescriptor

public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, Word> descriptor =
                new MapStateDescriptor<>(
                        "words", // the state name
                        BasicTypeInfo.STRING_TYPE_INFO, // type information
                        TypeInformation.of(new TypeHint<Word>() {})
                        ); // default value of the state, if nothing was set
       // sum = getRuntimeContext().getState(descriptor);

    }
public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, Word> descriptor =
                new MapStateDescriptor<>(
                        "words", // the state name
                        String.class, // type information
                        Word.class
                        ); // default value of the state, if nothing was set
       // sum = getRuntimeContext().getState(descriptor);

    }

设计草稿

时间、窗口、水印

state、fault tolence

目录

Streams and Operations on Streams

TaskManager故障恢复

JobManager可以通过配置高可用的zookeeper来保证故障恢复,TaskManager怎么故障恢复能?


image.png

Keep in mind that in In stand alone mode a TM process that has exited
won't be automatically restarted though.

实践所得:在standalone集群模式下,杀死一个TaskManager,那么机器上的这个TaskManager不会重启,会使用其他机器剩余可用的TaskManager中slots来运行失败的task。

Slots

image.png

A Flink cluster needs exactly as many task slots as the highest parallelism used in the job. No need to calculate how many tasks (with varying parallelism) a program contains in total.

flink使用的slot数目是最高的并行度数目。

alignment

alignment的影响


image.png

Asynchronous State Snapshots

只有RockDB里面才有


image.png

各种state的例子

operator state.png 优点.png
image.png

state backend

image.png

同步和异步的checkpoint区别

image.png
image.png
image.png
image.png

异步

Checkpoint UI信息

image.png
image.png
显示全文