全国网站集约化建设试点,网站软文制作,关于加强机关网站建设,商标设计logo网站TaskManager在Flink中运行用户代码。根据需要配置内存使用#xff0c;可以极大地减少Flink的资源占用#xff0c;提高作业的稳定性。 注意下面的讲解适用于TaskManager 1.10之后的版本。与JobManager进程的内存模型相比#xff0c;TaskManager内存组件具有类似但更复杂的结构…TaskManager在Flink中运行用户代码。根据需要配置内存使用可以极大地减少Flink的资源占用提高作业的稳定性。 注意下面的讲解适用于TaskManager 1.10之后的版本。与JobManager进程的内存模型相比TaskManager内存组件具有类似但更复杂的结构。
1. 配置 Total Memory
Flink JVM 进程的总内存Total Process Memory由 Flink 应用程序Total Flink MemoryFlink 总内存和 JVM 运行进程所消耗的内存组成。Flink 总内存 Total Flink Memory包括 JVM Heap、Managed Memory托管内存由 Flink 管理和其他 Direct Memory或 Native Memory的使用量。如下结构 如果在本地运行Flink例如在IDE中而没有创建集群那么只有一部分内存配置选项是相关的详情见下面的本地执行章节。 否则为TaskManagers 设置内存的最简单方法是配置总内存。下面将更详细地描述一种更细粒度的方法。 其余的内存组件将根据默认值或额外配置的选项自动调整。有关其他内存组件的更多细节请参阅后面的章节。
2. 配置 Heap and Managed Memory
前面提到过在Flink中设置内存的另一种方法是显式指定Task Heap Memory 和 Managed Memory。它让Flink能够更好地控制可用的JVM Heap 和 Managed Memory。
其余的内存组件将根据默认值或额外配置的选项自动调整。下面是关于其他内存组件的更多细节。
如果已明确配置了任务 Task Heap Memory 和 Managed Memory建议既不要设置进程总内存Total Process Memory也不要设置 Flink 总内存Total Flink Memory。否则很容易导致内存配置冲突。
2.1. Task (Operator) Heap Memory
如果你想保证用户代码有一定量的JVM堆可用可以显式设置Task Heap Memorytaskmanager.memory.task.heap.size。它会被添加到JVM的堆大小中并专门用于运行用户代码的 Flink Operator。
2.2. Managed Memory
Managed memory托管内存 由Flink管理并作为本地内存堆外off-heap分配。以下工作负载使用托管内存
流式作业可以将其用于RocksDB状态后端。流和批处理作业都可以使用它来排序、散列表和缓存中间结果。流和批处理作业都可以使用它在Python进程中执行用户定义的函数。
托管内存大小可以由以下配置
可通过 taskmanager.memory.managed.size 进行显式配置。或通过 taskmanager.memory.managed.fraction乘以Flink 总内存Total Flink Memory来计算出来。
如果同时设置了大小和比例则大小优先于比例。如果既没有明确配置大小也没有配置比例则将使用默认比例。
如果你的作业包含多种类型的托管内存使用者你可以控制如何在这些类型之间共享托管内存。配置选项 taskmanager.memory.managed.consumer-weights 可以为每种类型设置权重Flink会按比例分配托管内存。有效的消费者类型有
OPERATOR用于内置算法。STATE_BACKEND用于流式传输中的 RocksDB 状态后端。PYTHON用于PYTHON进程。
例如如果一个流作业同时使用RocksDB状态后端和Python udf并且消费者权重配置为STATE_BACKEND:70 Python:30 Flink将为RocksDB状态后端保留70%的总管理内存为Python进程保留30%。
对于每种类型Flink只在作业中包含该类型的托管内存消费者时才分配托管内存。例如如果一个流作业使用堆状态后端和Python udf并且消费者权重配置为STATE_BACKEND:70 Python:30 Flink将为Python进程使用其所有的托管内存因为堆状态后端不使用托管内存。
注意对于未包含在consumer权重中的consumer类型Flink不会为其分配托管内存。如果作业确实需要缺失的类型则可能导致内存分配失败。默认情况下包含所有消费者类型。只有当权重被显式配置/覆盖时才会发生这种情况。
3. 配置 Off-Heap MemoryDirect or Native
由用户代码分配的堆外内存应计入任务堆外内存taskmanager.memory.task.off-heap.size。
你也可以调整框架的堆外内存Framework Off-Heap Memory。但是只有在确定 Flink 框架需要更多内存时才应该去更改该值。
Flink 将框架堆外内存Framework Off-Heap Memory和任务堆外内存Task Off-Heap Memory纳入 JVM 的直接内存Direct Memory限制。
注意虽然本地非直接内存Native Non-Direct Memory的使用可以作为框架堆外内存或任务堆外内存的一部分但在这种情况下会导致更高的 JVM 直接内存限制。
注意网络内存Network Memory也是 JVM 直接内存的一部分但它由 Flink 管理并保证永远不会超过其配置大小。因此调整网络内存大小对这种情况没有帮助。 详见下方的内存详细模型。
4. 详细内存模型 下表列出了如上所述的所有内存组件以及影响各个组件大小的Flink配置选项。
组件配置项描述Framework Heap Memorytaskmanager.memory.framework.heap.size专用于Flink框架的JVM堆内存Task Heap Memorytaskmanager.memory.task.heap.sizeJVM堆内存专用于Flink应用程序用于运行flink operators和用户代码Managed memorytaskmanager.memory.managed.size taskmanager.memory.managed.fraction由Flink管理的本地内存保留被用于排序哈希表缓存中间结果和RocksDB状态后端Framework Off-heap Memorytaskmanager.memory.framework.off-heap.size专用于Flink框架的堆外直接或本地内存Task Off-heap Memorytaskmanager.memory.task.off-heap.size专用于Flink应用程序以运行操作符的堆外直接或本地内存Network Memorytaskmanager.memory.network.min taskmanager.memory.network.max taskmanager.memory.network.fraction为 Task 之间的数据交换预留的直接内存例如通过网络传输的缓冲是Flink总内存Total Flink Size的一个fractionated部分。这些内存用于分配网络缓冲区JVM metaspacetaskmanager.memory.jvm-metaspace.sizeFlink JVM进程的元空间大小JVM Overheadtaskmanager.memory.jvm-overhead.min taskmanager.memory.jvm-overhead.max taskmanager.memory.jvm-overhead.fraction本地内存是为其他JVM开销保留的例如线程堆栈、代码缓存、垃圾收集空间等它是总进程内存的一个受限制的分块组件 如你所见一些内存组件的大小可以通过相应的选项简单地设置。其他组件可以使用多个选项进行调优。
5. Framework Memory
如果没有充分的理由不应该更改框架的堆内存Framework Heap Memory和框架的堆外内存Framework Off-Heap Memory。只有在确定Flink需要为某些内部数据结构或操作增加内存时才需要调整内存大小。它可能与特定的部署环境或作业结构相关如高度并行性。此外在某些情况下Flink的依赖如Hadoop可能会消耗更多的直接内存或本地内存。
注意Flink 目前既没有隔离 Framework Heap Memory 和 Task Heap Memory也没有隔离 Framework Off-Heap Memory 和 Task Off-Heap Memory。框架和任务内存的分离可以在未来的版本中用于进一步的优化。
6. 本地执行Local Execution
如果你在本地机器上启动Flink作为单个java程序而不创建集群例如从你的IDE那么所有组件都被忽略除了以下组件
内存组件相关选项默认值Task heaptaskmanager.memory.task.heap.size无限制Task off-heaptaskmanager.memory.task.off-heap.size无限制Managed memorytaskmanager.memory.managed.size128MBNetwork memory taskmanager.memory.network.min taskmanager.memory.network.max 64MB
上面列出的所有组件都可以但不必为本地执行显式配置。如果没有配置则将其设置为默认值。任务堆内存Task Heap和任务堆外内存Task Off-Heap被认为是无限大的Long.MAX_VALUE 字节。托管内存Managed Memory在本地执行模式下的默认值为128MB。
注意在这种情况下任务堆大小与实际堆大小没有任何关系。它可以与下一个版本带来的未来优化相关。已启动的本地进程的实际JVM堆大小并不受Flink控制而是取决于如何启动进程。如果您想控制JVM堆大小则必须显式传递相应的JVM参数例如-Xmx、-Xms。
7. 实际配置效果展示
注意和上图的详细内存模型组成和各个配置项在不配置的时候的默认值结合着分析。即
Total Process Size Total Flink Size JVM Metaspace JVM OverheadTotal Flink Size Total Heap Size Total Off-Heap SizeTotal JVM Heap Size Task Heap Framework HeapTotal Off-Heap Managed Memory Direct MemoryTask Off-Heap Framework Off-Heap Network MemoryJVM Metaspace taskmanager.memory.jvm-metaspace.size默认值256mJVM Overhead Total Process Size * taskmanager.memory.jvm-overhead.fraction默认值0.1Task Heap taskmanager.memory.task.heap.sizeFramework Heap taskmanager.memory.framework.heap.size默认值128mManaged Memory taskmanager.memory.managed.size没配置的话就是 Total Flink Size * taskmanager.memory.managed.fraction默认值0.4Task Off-Heap taskmanager.memory.task.off-heap.size默认值0Framework Off-Heap taskmanager.memory.framework.off-heap.size默认值128mNetwork Memory Total Flink Size * taskmanager.memory.network.fraction默认值0.1其他配置值都可以在官网配置页面查看注意有些 fraction会有对应的最大最小值限制范围。
情况1只配置 Total Process Size
这里配置 Total Process Size 为 2048m即 -Dtaskmanager.memory.process.size2048m
这里选择使用application mode 运行在yarn上 可以看下内存是如何计算出以上配置值的下面值乘以系数的做了四舍五入处理为了和图对应
首先只配置了 Total Process Size 2048m其他就都是取默认值或推导计算出。JVM Metaspace 256mJVM Overhead 2048m * 0.1 205mTotal Flink Size 2048m - 256m - 205m 1587mTask Heap没指定所以先计算其他的值最终减去其他值得到Framework Heap 128mManaged Memory没有配置taskmanager.memory.managed.size所以取值1587m * 0.4 635mFramework Off-Heap 128mTask Off-Heap 0Network Memory 1587m * 0.1 159m而且没超过最大最小值限制Direct Memory 0 128m 159m 287mTask Heap Total Flink Size - Framework Heap - Framework Off-Heap - Task Off-Heap - Managed Memory - Network Memory即 1587m - 128m - 128m - 0 - 635m - 159m 538mTotal JVM Heap Size 538m 128m 666m
从jobmanager日志和taskmanager日志上也能看到内存分配值 情况2只配置 Total Flink Size
这里配置 Total Flink Size 为 2048m即 -Dtaskmanager.memory.flink.size2048m
这里选择使用Session Mode 运行在standalone上注意使用哪种部署方式无关紧要这里运行在session模式下只是看到内存分配日志打印的更为清晰
测试环境的准备可以使用配置 conf/flink-conf.yaml里配置 taskmanager.memory.flink.size: 2048m
然后执行 bash bin/start-cluster.sh 然后看taskmanager的信息 可以看下内存是如何计算出以上配置值的下面值乘以系数的做了四舍五入处理为了和图对应
首先只配置了 Total Flink Size 2048m其他就都是取默认值或推导计算出。Task Heap没指定所以先计算其他的值最终减去其他值得到Framework Heap 128mManaged Memory没有配置taskmanager.memory.managed.size所以取值2048m * 0.4 819mFramework Off-Heap 128mTask Off-Heap 0Network Memory 2048m * 0.1 205m而且没超过最大最小值限制Direct Memory 0 128m 205m 333mTask Heap Total Flink Size - Framework Heap - Framework Off-Heap - Task Off-Heap - Managed Memory - Network Memory即 2048m - 128m - 128m - 0 - 819m - 205m 768mTotal JVM Heap Size 768m 128m 896mJVM Metaspace 256m2048m 256m JVM Overhead*0.1 JVM Overhead得到JVM Overhead 256mTotal Process Size 2048m 256m 256m 2560m
taskmanager日志上也能看到内存分配值 情况3分配各个组件具体值
这种情况和以上两种并没区别都是按公式一步步导出即可这里不再举例。