Spark中每个Stage中的Task会被分配到一个Worker中的 ->Executor容器里面的 ->一个线程池中被执行,Flink称每个Executor为一个TaskManager,每个TaskManager中会有多个slot作为内存隔离:
Spark:Worker ——> Executor ——> 线程池 ——> 线程
Flink: Worker ——> TaskManager ——> Slot ——> 线程
Slot是TaskManager资源粒度的划分,每个Slot都有自己独立的内存。所有Slot平均分配TaskManger的内存,比如TaskManager分配给Solt的内存为8G,两个Slot,每个Slot的内存为4G,四个Slot,每个Slot的内存为2G,值得注意的是,Slot仅划分内存,不涉及cpu的划分。同时Slot是Flink中的任务执行器(类似Storm中Executor),每个Slot可以运行多个task,而且一个task会以单独的线程来运行。Slot主要的好处有以下几点:
可以起到隔离内存的作用,防止多个不同job的task竞争内存。
Slot的个数就代表了一个Flink程序的最高并行度,简化了性能调优的过程
允许多个Task共享Slot,提升了资源利用率,举一个实际的例子,kafka有3个partition,对应flink的source有3个task,而keyBy我们设置的并行度为20,这个时候如果Slot不能共享的话,需要占用23个Slot,如果允许共享的话,那么只需要20个Slot即可(Slot的默认共享规则计算为20个)。
Flink 是一个流处理框架,支持流处理和批处理,特点是流处理有限,可容错,可扩展,高吞吐,低延迟。
流处理是处理一条,立马下一个节点会从缓存中取出,在下一个节点进行计算
批处理是只有处理一批完成后,才会经过网络传输到下一个节点
流处理的优点是低延迟 批处理的优点是高吞吐
flink同时支持两种,flink的网络传输是设计固定的缓存块为单位,用户可以设置缓存块的超时值来决定换存块什么时候进行传输。 数据大于0 进行处理就是流式处理。
如果设置为无限大就是批处理模型。
Flink 集群包括 JobManager 和 TaskManager .
JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包 等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。
TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要 部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。
flink on yarn 是由client 提交 app到 RM 上, 然后RM 分配一个 AppMaster负责运行 Flink JobManager 和 Yarn AppMaster, 然后 AppMaster 分配 容器去运行 Flink TaskManger
SparkStreaming 是将流处理分成微批处理的作业, 最后的处理引擎是spark job
Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块,Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,然后分批次提交job到集群中去运行,运行每个 job的过程和真正的spark 任务没有任何区别。
JobScheduler, 负责 Job的调度通过定时器每隔一段时间根据Dstream的依赖关系生一个一个DAG图
ReceiverTracker负责数据的接收,管理和分配
ReceiverTracker在启动Receiver的时候他有ReceiverSupervisor,其实现是ReceiverSupervisorImpl, ReceiverSupervisor本身启 动的时候会启动Receiver,Receiver不断的接收数据,通过BlockGenerator将数据转换成Block。定时器会不断的把Block数据通会不断的把Block数据通过BlockManager或者WAL进行存储,数据存储之后ReceiverSupervisorlmpl会把存储后的数据的元数据Metadate汇报给ReceiverTracker,其实是汇报给ReceiverTracker中的RPC实体ReceiverTrackerEndpoin
spark on yarn 的cluster模式, Spark client 向RM提交job请求, RM会分配一个 AppMaster, driver 和 运行在AppMAster节点里, AM然后把Receiver作为一个Task提交给Spark Executor 节点, Receive启动接受数据,生成数据块,并通知Spark Appmaster, AM会根据数据块生成相应的Job, 并把Job 提交给空闲的 Executor 去执行。
1:需要关注流数据是否需要进行状态管理
2:At-least-once或者Exectly-once消息投递模式是否有特殊要求
3:对于小型独立的项目,并且需要低延迟的场景,建议使用storm
4:如果你的项目已经使用了spark,并且秒级别的实时处理可以满足需求的话,建议使用sparkStreaming
5:要求消息投递语义为 Exactly Once 的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或窗口统计的场景,建议使用flink
Flink 提供的Api右 DataStream 和 DataSet ,他们都是不可变的数据集合,不可以增加删除中的元素, 通过 Source 创建 DataStream 和 DataSet
在创建运行时有:
Flink的每一个Operator称为一个任务, Operator 的每一个实例称为子任务,每一个任务在JVM线程中执行。可以将多个子任务链接成一个任务,减少上下文切换的开销,降低延迟。
source 和 算子map 如果是 one by one 的关系,他们的数据交换可以通过缓存而不是网络通信
TaskManager 为控制执行任务的数量,将计算资源划分多个slot,每个slot独享计算资源,这种静态分配利于任务资源隔离。
同一个任务可以共享一个slot, 不同作业不可以。
这里因为 Source 和 Map 并行度都是4 采用直连方式,他们的数据通信采用缓存形式
所以一共需要两个TaskManager source,Map 一个,reduce一个, 每个TaskManager 要3个slot
JobManager 将 JobGraph 部署 ExecutionGraph
设置的并行度,可以让一个ExecJobVertex 对应 多个并行的ExecVertex 实例。
Flink通过状态机管理 ExecGraph的作业执行进度。
Flink 将对象序列化为固定数量的预先分配的内存段,而不是直接把对象放在堆内存上。
Flink TaskManager 是由几个内部组件组成的:actor 系统(负责与 Flink master 协调)、IOManager(负责将数据溢出到磁盘并将其读取回来)、MemoryManager(负责协调内存使用。
数据源:
Sink:
时间:
处理时间:取自Operator的机器系统时间
事件时间: 由数据源产生
进入时间: 被Source节点观察时的系统时间
如果数据源没有自己正确创建水印,程序必须自己生成水印来确保基于事件的时间窗口可以正常工作。。
DataStream 提供了 周期性水印,间歇式水印,和递增式水印
用户自定义的函数是一个重要的特性,因为它们展示扩展了查询的表达能力。
在大多数情况下,必须先注册用户定义的函数,然后才能在查询中使用它。 TableEnvironment 通过调用 registerFunction() 方法来注册函数。
如果内置函数中不包含必需的标量函数,则可以为Table API和SQL定义用户自定义的标量函数。用户定义的标量函数将零个,一个或多个标量值映射到新的标量值。
为了定义标量函数,必须扩展 org.apache.flink.Table.function 中的基类 ScalarFunction 和实现(一个或多个)评估方法。标量函数的行为由评估方法决定。必须公开声明一个评估方法并命名为 eval 。评估方法的参数类型和返回类型也决定标量函数的参数和返回类型。还可以通过实现名为eval的多个评估方法来重载评估方法。评估方法也可以支持变量参数,比如 eval(String…str) 。
以下示例展示如何定义标量函数,在 TableEnvironment 中注册它,并在查询中调用它。
默认情况下,评估方法的结果类型由Flink的类型提取工具决定。对于基本类型或简单pojo,这已经足够了。但是对于更复杂的、自定义的或复合类型,这可能是错误的。在这些情况下,可以通过通过重写 ScalarFunction 的 getResultType() 手动定义结果类型的类型信息。
与用户定义的标量函数类似,用户定义的表函数将零个,一个或多个标量值作为输入参数。但是,与标量函数相比,它可以返回任意数量的行作为输出而不是单个值。返回的行可以包含一个或多个列。
为了定义表函数,必须扩展 org.apache.flink.table.function 中的基类 TableFunction 和实现(一个或多个)评估方法。表函数的行为由其评估方法决定。一个评估方法必须声明为public并命名为eval。可以通过实现名为eval的多个评估方法来重载 TableFunction 。评估方法的参数类型决定了表函数的所有有效参数。评估方法也可以支持变量参数,比如 eval(String…str) 。返回的表的类型由 TableFunction 的泛型类型决定。评估方法使用受保护的 collect(T) 方法发出输出行。
在Table API中,表函数与 .join(Table) 或 . leftouterjoin(Table) 一起使用。joinLateral 操作将外部表(操作符左侧的表)中的每一行与表值函数(操作符右侧的表)生成的所有行关联起来。 leftOuterJoin 操作符将外部表(操作符左侧的表)中的每一行与表值函数(操作符右侧的表)生成的所有行连接起来,并保留表函数返回空表的外部行。在SQL中,使用带有CROSS JOIN或LEFT JOIN且具有ON TRUE连接条件的LATERAL TABLE(<Table Function>)。
以下示例展示如何定义表值函数,在 TableEnvironment 中注册它,并在查询中调用它。
默认情况下,评估方法的结果类型由Flink的类型提取工具决定。对于基本类型或简单pojo,这已经足够了。但是对于更复杂的、自定义的或复合类型,这可能是错误的。在这些情况下,可以通过通过重写 TableFunction 的 getResultType() 手动定义结果类型的类型信息。
用户定义的聚合函数将一个表的一个或多个行并且具有一个或多个属性聚合为标量值。
用户定义的聚合函数是通过扩展 AggregateFunction 类实现的。 AggregateFunction 的工作原理如下。首先,需要定义一个累加器,它是保存聚合的中间结果的数据结构。然后是利用 AggregateFunction 的 createAccumulator() 方法创建一个空的累加器。接下来,对每个输入行调用函数的 accumulate() 方法来更新累加器。处理完所有行之后,调用函数的 getValue() 方法来计算并返回最终结果。
除了上述所必须的方法之外,还有一些可选择性实现的约定方法。虽然其中一些方法允许系统更高效地执行查询,但是其他方法对于某些用例则必需的。e.g. retract():在有界OVER窗口上聚合是必需的。merge():许多批处理聚合和会话窗口聚合都需要。resetAccumulator():许多批处理聚合都需要。
AggregateFunction 的所有方法都必须声明为 public 的,而不是 static 。
有时,用户定义的函数可能需要获取全局运行时信息,或者在实际工作之前进行一些设置/清理工作。用户自定义的函数可以通过覆盖open()和close()方法实现。
open()方法在评估方法之前调用一次。在最后一次调用评估方法之后在调用close()方法。
open()方法提供一个FunctionContext,其中包含有关用户定义函数在其中执行的上下文的信息。
欢迎分享,转载请注明来源:夏雨云
评论列表(0条)