博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink TaskManager的managed memory
阅读量:7125 次
发布时间:2019-06-28

本文共 18088 字,大约阅读时间需要 60 分钟。

本文主要研究一下flink TaskManager的managed memory

TaskManagerOptions

flink-core-1.7.2-sources.jar!/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolvingpublic class TaskManagerOptions {    //......    /**     * JVM heap size for the TaskManagers with memory size.     */    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)    public static final ConfigOption
TASK_MANAGER_HEAP_MEMORY = key("taskmanager.heap.size") .defaultValue("1024m") .withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" + " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" + " YARN container, minus a certain tolerance value."); /** * Amount of memory to be allocated by the task manager's memory manager. If not * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}. */ public static final ConfigOption
MANAGED_MEMORY_SIZE = key("taskmanager.memory.size") .defaultValue("0") .withDescription("Amount of memory to be allocated by the task manager's memory manager." + " If not set, a relative fraction will be allocated."); /** * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is * not set. */ public static final ConfigOption
MANAGED_MEMORY_FRACTION = key("taskmanager.memory.fraction") .defaultValue(0.7f) .withDescription("The relative amount of memory (after subtracting the amount of memory used by network" + " buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." + " For example, a value of `0.8` means that a task manager reserves 80% of its memory" + " for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" + " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() + " is not set."); /** * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager * as well as the network buffers. **/ public static final ConfigOption
MEMORY_OFF_HEAP = key("taskmanager.memory.off-heap") .defaultValue(false) .withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" + " TaskManager as well as the network buffers."); /** * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */ public static final ConfigOption
MANAGED_MEMORY_PRE_ALLOCATE = key("taskmanager.memory.preallocate") .defaultValue(false) .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting."); //......}
  • taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory

TaskManagerServices.calculateHeapSizeMB

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {    //......    /**     * Calculates the amount of heap memory to use (to set via -Xmx and -Xms)     * based on the total memory to use and the given configuration parameters.     *     * @param totalJavaMemorySizeMB     *         overall available memory to use (heap and off-heap)     * @param config     *         configuration object     *     * @return heap memory to use (in megabytes)     */    public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {        Preconditions.checkArgument(totalJavaMemorySizeMB > 0);        // subtract the Java memory used for network buffers (always off-heap)        final long networkBufMB =            calculateNetworkBufferMemory(                totalJavaMemorySizeMB << 20, // megabytes to bytes                config) >> 20; // bytes to megabytes        final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;        // split the available Java memory between heap and off-heap        final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);        final long heapSizeMB;        if (useOffHeap) {            long offHeapSize;            String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();            if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {                try {                    offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();                } catch (IllegalArgumentException e) {                    throw new IllegalConfigurationException(                        "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);                }            } else {                offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);            }            if (offHeapSize <= 0) {                // calculate off-heap section via fraction                double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);                offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);            }            TaskManagerServicesConfiguration                .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,                    TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),                    "Managed memory size too large for " + networkBufMB +                        " MB network buffer memory and a total of " + totalJavaMemorySizeMB +                        " MB JVM memory");            heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;        } else {            heapSizeMB = remainingJavaMemorySizeMB;        }        return heapSizeMB;    }    //......}
  • taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7
  • 如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize
  • 如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize

TaskManagerServices.createMemoryManager

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {    //......    /**     * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.     *     * @param taskManagerServicesConfiguration to create the memory manager from     * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory     * @param maxJvmHeapMemory the maximum JVM heap size     * @return Memory manager     * @throws Exception     */    private static MemoryManager createMemoryManager(            TaskManagerServicesConfiguration taskManagerServicesConfiguration,            long freeHeapMemoryWithDefrag,            long maxJvmHeapMemory) throws Exception {        // computing the amount of memory to use depends on how much memory is available        // it strictly needs to happen AFTER the network stack has been initialized        // check if a value has been configured        long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();        MemoryType memType = taskManagerServicesConfiguration.getMemoryType();        final long memorySize;        boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();        if (configuredMemory > 0) {            if (preAllocateMemory) {                LOG.info("Using {} MB for managed memory." , configuredMemory);            } else {                LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);            }            memorySize = configuredMemory << 20; // megabytes to bytes        } else {            // similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)            float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();            if (memType == MemoryType.HEAP) {                // network buffers allocated off-heap -> use memoryFraction of the available heap:                long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);                if (preAllocateMemory) {                    LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,                        memoryFraction , relativeMemSize >> 20);                } else {                    LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +                        "memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);                }                memorySize = relativeMemSize;            } else if (memType == MemoryType.OFF_HEAP) {                // The maximum heap memory has been adjusted according to the fraction (see                // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.                // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)                // directMemorySize = jvmTotalNoNet * memoryFraction                long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);                if (preAllocateMemory) {                    LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,                        memoryFraction, directMemorySize >> 20);                } else {                    LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +                        " memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);                }                memorySize = directMemorySize;            } else {                throw new RuntimeException("No supported memory type detected.");            }        }        // now start the memory manager        final MemoryManager memoryManager;        try {            memoryManager = new MemoryManager(                memorySize,                taskManagerServicesConfiguration.getNumberOfSlots(),                taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),                memType,                preAllocateMemory);        } catch (OutOfMemoryError e) {            if (memType == MemoryType.HEAP) {                throw new Exception("OutOfMemory error (" + e.getMessage() +                    ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);            } else if (memType == MemoryType.OFF_HEAP) {                throw new Exception("OutOfMemory error (" + e.getMessage() +                    ") while allocating the TaskManager off-heap memory (" + memorySize +                    " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);            } else {                throw e;            }        }        return memoryManager;    }    //......}
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager
  • 当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction)
  • 当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)

TaskManagerServicesConfiguration

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

public class TaskManagerServicesConfiguration {    //......    /**     * Utility method to extract TaskManager config parameters from the configuration and to     * sanity check them.     *     * @param configuration The configuration.     * @param remoteAddress identifying the IP address under which the TaskManager will be accessible     * @param localCommunication True, to skip initializing the network stack.     *                                      Use only in cases where only one task manager runs.     * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.     */    public static TaskManagerServicesConfiguration fromConfiguration(            Configuration configuration,            InetAddress remoteAddress,            boolean localCommunication) throws Exception {        // we need this because many configs have been written with a "-1" entry        int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);        if (slots == -1) {            slots = 1;        }        final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);        String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);        if (localStateRootDir.length == 0) {            // default to temp dirs.            localStateRootDir = tmpDirs;        }        boolean localRecoveryMode = configuration.getBoolean(            CheckpointingOptions.LOCAL_RECOVERY.key(),            CheckpointingOptions.LOCAL_RECOVERY.defaultValue());        final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(            configuration,            localCommunication,            remoteAddress,            slots);        final QueryableStateConfiguration queryableStateConfig =                parseQueryableStateConfiguration(configuration);        // extract memory settings        long configuredMemory;        String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();        if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {            try {                configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();            } catch (IllegalArgumentException e) {                throw new IllegalConfigurationException(                    "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);            }        } else {            configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);        }        checkConfigParameter(            configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) ||                configuredMemory > 0, configuredMemory,            TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),            "MemoryManager needs at least one MB of memory. " +                "If you leave this config parameter empty, the system automatically " +                "pick a fraction of the available memory.");        // check whether we use heap or off-heap memory        final MemoryType memType;        if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {            memType = MemoryType.OFF_HEAP;        } else {            memType = MemoryType.HEAP;        }        boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);        float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);        checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,            TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),            "MemoryManager fraction of the free memory must be between 0.0 and 1.0");        long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();        return new TaskManagerServicesConfiguration(            remoteAddress,            tmpDirs,            localStateRootDir,            localRecoveryMode,            networkConfig,            queryableStateConfig,            slots,            configuredMemory,            memType,            preAllocateMemory,            memoryFraction,            timerServiceShutdownTimeout,            ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));    }    //......}
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

小结

  • TaskManager的managed memory分类heap及offHeap两种类型;taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory;taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7;如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize;如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager;当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

doc

转载地址:http://qieel.baihongyu.com/

你可能感兴趣的文章
数据包的分类和调度-Linux TC的另一种解释
查看>>
Nginx服务器平滑升级
查看>>
yum [Errno 256] No more mirrors to try 解决方法
查看>>
第 3 章 Keystone - 018 - 理解 Keystone 核心概念
查看>>
Top 10 Methods for Java Arrays
查看>>
Ex2010-14 Access Ex2013 ECP/OWA in a co-existence scenario
查看>>
Linux命令详解 -- iptables
查看>>
IOS开发知识(四)
查看>>
张涵20160401作业
查看>>
(GeoTrust 企业(OV)型 增强版(EV) SSL证书
查看>>
CentOS 7 用户账户配置
查看>>
PHP
查看>>
常用Python数据分析库详解
查看>>
Java之品优购课程讲义_day14(2)
查看>>
python数据结构与算法(8)
查看>>
js中 的排序方法
查看>>
安装mysql数据库
查看>>
如何使用HP Mobile Recorder录制手机app性能脚本,录制app性能脚本方法
查看>>
很多未解之谜终于有答案了——2018年JVM生态系统报告出炉
查看>>
趣味理解java事件处理逻辑
查看>>