Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图: 对数据的操作也是按照RDD为单位来进行的 Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。 它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果。
DStream上的操作与RDD的类似,分为:
Transformations(转换)Output Operations(输出)两种 此外转换操作中还有一些比较特殊的操作,如:updateStateByKey()、transform()以及各种Window相关的操作。常见Transformation—无状态转换:每个批次的处理不依赖于之前批次的数据
TransformationMeaningmap(func)对DStream中的各个元素进行func函数操作,然后返回一个新的DStreamflatMap(func)与map方法类似,只不过各个输入项可以被输出为零个或多个输出项filter(func)过滤出所有函数func返回值为true的DStream元素并返回一个新的DStreamrepartition(numPartitions)增加或减少DStream中的分区数,从而改变DStream的并行度union(otherStream)将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.count()通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStreamreduce(func)对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.countByValue()对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数reduceByKey(func, [numTasks])利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStreamjoin(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W))类型的DStreamcogroup(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStreamtransform(func)通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDDupdateStateByKey(func)根据key的之前状态值和key的新值,对key进行更新,返回一个新状态的DStream 特殊的Transformations–有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。 有状态转换包括基于追踪状态变化的转换(updateStateByKey)和滑动窗口的转换 (1)UpdateStateByKey Operation UpdateStateByKey用于记录历史记录,保存上次的状态 (2)Window Operations(开窗函数)滑动窗口转换操作: 滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后,就可以让窗口按照指定时间间隔在源DStream上滑动,每次窗口停放的位置上,都会有一部分DStream被框入窗口内,形成一个小段的DStream,这时,就可以启动对这个小段DStream的计算。
(1)红色的矩形就是一个窗口,窗口框住的是一段时间内的数据流。 (2)这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。 所以基于窗口的操作,需要指定2个参数:
window length - The duration of the window (3 in the figure)slide interval - The interval at which the window-based operation is performed (2 in the figure).a.窗口大小,一段时间内数据的容器。 b.滑动间隔,每隔多久计算一次。
通过源码认识transform函数,有两个方法重载,声明如下:
Output Operations可以将DStream的数据输出到外部的数据库或文件系统,当某个Output Operations被调用时(与RDD的Action相同),spark streaming程序才会开始真正的计算过程。
Output OperationMeaningprint()打印到控制台saveAsTextFiles(prefix, [suffix])保存流的内容为文本文件,文件名为"prefix-TIME_IN_MS[.suffix]".saveAsObjectFiles(prefix, [suffix])保存流的内容为SequenceFile,文件名为 “prefix-TIME_IN_MS[.suffix]”.saveAsHadoopFiles(prefix, [suffix])保存流的内容为hadoop文件,文件名为 “prefix-TIME_IN_MS[.suffix]”.foreachRDD(func)对Dstream里面的每个RDD执行funcforeachRDD函数属于将DStream中结果数据RDD输出的操作,类似transform函数,针对每批次RDD数据操作,源码声明如下:
回顾SparkCore和SparkSQL及SparkStreaming处理数据时编程:
1)、SparkCore 数据结构:RDDSparkContext:上下文实例对象 2)、SparkSQL 数据结构:Dataset/DataFrame = RDD + SchemaSparkSession:会话实例对象,在Spark 1.x中SQLContext/HiveContext 3)、SparkStreaming 数据结构:DStream = Seq[RDD]StreamingContext:流式上下文实例对象, 底层还是SparkContext参数:划分流式数据时间间隔BatchInterval:1s,5s(演示)官方文档:streaming-programming-guidel#initializing-streamingcontext
从官方文档可知,提供两种方式构建StreamingContext实例对象,截图如下:
第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象针对SparkStreaming流式应用来说,代码逻辑大致如下五个步骤:
1、Define the input sources by creating input DStreams. 定义从哪个数据源接收流式数据,封装到DStream中 2、Define the streaming computations by applying transformation and output operations to DStreams. 针对业务调用DStream中函数,进行数据处理和输出 3、Start receiving data and processing it using streamingContext.start(). 4、Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination(). 5、The processing can be manually stopped using streamingContext.stop(). 启动流式应用,并且一直等待程序终止(人为或异常),最后停止运行