博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark streaming容错机制
阅读量:7115 次
发布时间:2019-06-28

本文共 5528 字,大约阅读时间需要 18 分钟。

hot3.png

receiver容错

spark streaming从1.2开始提供了数据的零丢失,想享受这个特性,需要满足如下条件:

1.数据输入需要可靠的sources和可靠的receivers

2.应用metadata必须通过应用driver checkpoint

3.WAL(write ahead log)

可靠的sources和receivers

spark streaming可以通过多种方式作为数据sources(包括kafka),输入数据通过receivers接收,通过replication存储于spark中(为了faultolerance,默认复制到两个spark executors),如果数据复制完成,receivers可以知道(例如kafka中更新offsets到zookeeper中)。这样当receivers在接收数据过程中crash掉,不会有数据丢失,receivers没有复制的数据,当receiver恢复后重新接收。

checkpoint

Spark Streaming 会 checkpoint 两种类型的数据。

  • Metadata(元数据) checkpointing - 保存定义了 Streaming 计算逻辑至类似 HDFS 的支持容错的存储系统。用来恢复 driver,元数据包括:
    • 配置 - 用于创建该 streaming application 的所有配置
    • DStream 操作 - DStream 一些列的操作
    • 未完成的 batches - 那些提交了 job 但尚未执行或未完成的 batches
  • Data checkpointing - 保存已生成的RDDs至可靠的存储。这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链

总之,metadata checkpointing 主要用来恢复 driver;而 RDD数据的 checkpointing 对于stateful 转换操作是必要的。

激活checkpoint

启用 checkpoint,需要设置一个支持容错 的、可靠的文件系统(如 HDFS、s3 等)目录来保存 checkpoint 数据。通过调用 streamingContext.checkpoint(checkpointDirectory) 来完成。另外,如果你想让你的 application 能从 driver 失败中恢复,你的 application 要满足:

  • 若 application 为首次重启,将创建一个新的 StreamContext 实例
  • 如果 application 是从失败中重启,将会从 checkpoint 目录导入 checkpoint 数据来重新创建 StreamingContext 实例

通过 StreamingContext.getOrCreate 可以达到目的:

// 通过函数来创建或者从已有的checkpoint里面构建StreamingContext  def functionToCreateContext(): StreamingContext = {    val ssc = new StreamingContext(...)   // new context    val rdds = ssc.socketTextStream(...) // create DStreams    ...    ssc.checkpoint("/spark/kmd/checkpoint")   // 设置在HDFS上的checkpoint目录    //设置通过间隔时间,定时持久checkpoint到hdfs上    rdds.checkpoint(Seconds(batchDuration*5))        rdds.foreachRDD(rdd=>{    //可以针对rdd每次调用checkpoint    //注意上面设置了,定时持久checkpoint下面这个地方可以不用写    rdd.checkpoint()        }    )    //返回ssc    ssc  }    def main(args:Array){  // 创建context  val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)  // 启动流计算  context.start()  context.awaitTermination()  }

如果 checkpointDirectory 存在,那么 context 将导入 checkpoint 数据。如果目录不存在,函数 functionToCreateContext 将被调用并创建新的 context

除调用 getOrCreate 外,还需要你的集群模式支持 driver 挂掉之后重启之。例如,在 yarn 模式下,driver 是运行在 ApplicationMaster 中,若 ApplicationMaster 挂掉,yarn 会自动在另一个节点上启动一个新的 ApplicationMaster。

需要注意的是,随着 streaming application 的持续运行,checkpoint 数据占用的存储空间会不断变大。因此,需要小心设置checkpoint 的时间间隔。设置得越小,checkpoint 次数会越多,占用空间会越大;如果设置越大,会导致恢复时丢失的数据和进度越多。一般推荐设置为 batch duration 的5~10倍。

checkpoint 的形式

最终 checkpoint 的形式是将类 Checkpoint的实例序列化后写入外部存储,值得一提的是,有专门的一条线程来做将序列化后的 checkpoint 写入外部存储。类 Checkpoint 包含以下数据

除了 Checkpoint 类,还有 CheckpointWriter 类用来导出 checkpoint,CheckpointReader 用来导入 checkpoint

注意

这里有有两个坑: 

(1)处理的逻辑必须写在functionToCreateContext函数中,你要是直接写在main方法中,在首次启动后,kill关闭,再启动就会报错 
关闭命令 

yarn application -kill application_1482996264071_34284

再次启动后报错信息 

has not been initialized when recovery from checkpoint

解决方案:将逻辑写在函数中,不要写main方法中, 

(2)首次编写Spark Streaming程序中,因为处理逻辑没放在函数中,全部放在main函数中,虽然能正常运行,也能记录checkpoint数据,但是再次启动先报(1)的错误,然后你解决了,打包编译重新上传服务器运行,会发现依旧报错,这次的错误和(1)不一样: 

xxxx classs ClassNotFoundException

但令你疑惑的是明明打的jar包中包含了,这个类,上一次还能正常运行这次为啥就不能了,问题就出在checkpoint上,因为checkpoint的元数据会记录jar的序列化的二进制文件,因为你改动过代码,然后重新编译,新的序列化jar文件,在checkpoint的记录中并不存在,所以就导致了上述错误,如何解决: 

也非常简单,删除checkpoint开头的的文件即可,不影响数据本身的checkpoint 

hadoop fs -rm /spark/kmd/check_point/checkpoint*

然后再次启动,发现一切ok,能从checkpoint恢复数据,然后kill掉又一次启动 

就能正常工作了。 

metadata checkpoint

可靠的sources和receivers,可以使数据在receivers失败后恢复,然而在driver失败后恢复是比较复杂的,一种方法是通过checkpoint metadata到HDFS或者S3。metadata包括:

  • configuration
  • code
  • 一些排队等待处理但没有完成的RDD(仅仅是metadata,而不是data)

这样当driver失败时,可以通过metadata checkpoint,重构应用程序并知道执行到那个地方。

数据可能丢失的场景

可靠的sources和receivers,以及metadata checkpoint也不可以保证数据的不丢失,例如:

  • 两个executor得到计算数据,并保存在他们的内存中
  • receivers知道数据已经输入
  • executors开始计算数据
  • driver突然失败
  • driver失败,那么executors都会被kill掉
  • 因为executor被kill掉,那么他们内存中得数据都会丢失,但是这些数据不再被处理
  • executor中的数据不可恢复

WAL

为了避免上面情景的出现,spark streaming 1.2引入了WAL。所有接收的数据通过receivers写入HDFS或者S3中checkpoint目录,这样当driver失败后,executor中数据丢失后,可以通过checkpoint恢复。

At-Least-Once

尽管WAL可以保证数据零丢失,但是不能保证exactly-once,例如下面场景:

  • Receivers接收完数据并保存到HDFS或S3
  • 在更新offset前,receivers失败了

  • Spark Streaming以为数据接收成功,但是Kafka以为数据没有接收成功,因为offset没有更新到zookeeper

  • 随后receiver恢复了
  • 从WAL可以读取的数据重新消费一次,因为使用的kafka High-Level消费API,从zookeeper中保存的offsets开始消费

WAL的缺点

通过上面描述,WAL有两个缺点:

  • 降低了receivers的性能,因为数据还要存储到HDFS等分布式文件系统
  • 对于一些resources,可能存在重复的数据,比如Kafka,在Kafka中存在一份数据,在Spark Streaming也存在一份(以WAL的形式存储在hadoop API兼容的文件系统中)

启动流程

当一个Spark Streaming应用启动了(例如driver启动), 相应的StreamingContext使用SparkContet去启动receiver,receiver是一个长时间执行的作业,这些接收器接收并保存这些数据到Spark的executor进程的内存中,这些数据的生命周期如下图所示

1:蓝色的箭头表示接收的数据,接收器把数据流打包成块,存储在executor的内存中,如果开启了WAL,将会把数据写入到存在容错文件系统的日志文件中
2:青色的箭头表示提醒driver, 接收到的数据块的元信息发送给driver中的StreamingContext, 这些元数据包括:executor内存中数据块的引用ID和日志文件中数据块的偏移信息
3:红色箭头表示处理数据,每一个批处理间隔,StreamingContext使用块信息用来生成RDD和jobs.  SparkContext执行这些job用于处理executor内存中的数据块
4:黄色箭头表示checkpoint这些计算,以便于恢复。流式处理会周期的被checkpoint到文件中

恢复流程

当一个失败的driver重启以后,恢复流程如下

1:黄色的箭头用于恢复计算,checkpointed的信息是用于重启driver,重新构造上下文和重启所有的receiver
2:  青色箭头恢复块元数据信息,所有的块信息对已恢复计算很重要
3:重新生成未完成的job(红色箭头),会使用到2恢复的元数据信息
4:读取保存在日志中的块(蓝色箭头),当job重新执行的时候,块数据将会直接从日志中读取,
5:重发没有确认的数据(紫色的箭头)。缓冲的数据没有写到WAL中去将会被重新发送。

 

direct API

为了WAL的性能损失和exactly-once,spark streaming1.3中使用Kafka direct API。非常巧妙,Spark driver计算下个batch的offsets,指导executor消费对应的topics和partitions。消费Kafka消息,就像消费文件系统文件一样。

1.不再需要kafka receivers,executor直接通过Kafka API消费数据

2.WAL不再需要,如果从失败恢复,可以重新消费

3.exactly-once得到了保证,不会再从WAL中重复读取数据,但是如果offset更新失败还是会导致数据重复

总结

主要说的是spark streaming通过各种方式来保证数据不丢失,并保证exactly-once,每个版本都是spark streaming越来越稳定,越来越向生产环境使用发展。

 

转载于:https://my.oschina.net/u/2000675/blog/1545323

你可能感兴趣的文章
C#序列化与反序列化以及深拷贝浅拷贝
查看>>
Django学习(一) Django安装配置
查看>>
通过Java代码实现对数据库的数据进行操作:增删改查(JDBC)
查看>>
iOS APP上架过程常见问题
查看>>
配置文件的读取 java
查看>>
【转】class 'org.springframework.orm.hibernate3.LocalSessionFactoryBean' not found解决办法
查看>>
VRTK 学习记录之 Teleport
查看>>
Ambari Server 配置组功能实现分析
查看>>
javascript this的范围理解
查看>>
zencart移站后批量替换数据库中网址、电子邮箱、重置用户密码
查看>>
zencart批量设置热卖商品 best seller、点击最高最受欢迎产品 most popular
查看>>
sql server 复制之找不到该行的非暴力处理方法
查看>>
MySql md5加密 sqlserver md5加密 C# md5加密 java md5加密
查看>>
jQuery介绍
查看>>
[ 转 ] 网页聊天室的原理
查看>>
eclipse中svn提交报错的解决
查看>>
mysql 总结
查看>>
php环境配置和nginx安装配置教程
查看>>
Python学习(21)python操作mysql数据库_操作
查看>>
ASCII
查看>>