焦虑的书签 · 如何在不显示窗口的情况下运行PowerShe ...· 1 月前 · |
慷慨的汽水 · “江门制造”成为欧洲杯的“显眼包”,背后还有 ...· 2 月前 · |
跑龙套的玉米 · 【问题解决】Error 400. The ...· 4 月前 · |
旅行中的长颈鹿 · Excel怎么对一个数开n次方根 ...· 5 月前 · |
聪明的莲藕 · 无颜之月在线,再来一次好吗动漫免费-影院快讯 ...· 8 月前 · |
All configuration can be set in Flink configuration file in the
conf/
directory (see
Flink Configuration File
).
The configuration is parsed and evaluated when the Flink processes are started. Changes to the configuration file require restarting the relevant processes.
The out of the box configuration will use your default Java installation. You can manually set the environment variable
JAVA_HOME
or the configuration key
env.java.home
in Flink configuration file if you want to manually override the Java runtime to use. Note that the configuration key
env.java.home
must be specified in a flattened format (i.e. one-line key-value format) in the configuration file.
You can specify a different configuration directory location by defining the
FLINK_CONF_DIR
environment variable. For resource providers which provide non-session deployments, you can specify per-job configurations this way. Make a copy of the
conf
directory from the Flink distribution and modify the settings on a per-job basis. Note that this is not supported in Docker or standalone Kubernetes deployments. On Docker-based deployments, you can use the
FLINK_PROPERTIES
environment variable for passing configuration values.
On session clusters, the provided configuration will only be used for configuring execution parameters, e.g. configuration parameters affecting the job, not the underlying cluster.
Flink Configuration File
Starting with version 1.19, Flink has officially introduced full support for the standard YAML 1.2 syntax. Compared to the previous versions which only supported simple key-value pairs, this update provides users with more flexible and powerful configuration capabilities. To take advantage of this new feature, users need to use the newly introduced
config.yaml
configuration file. The existing
flink-conf.yaml
configuration file is deprecated and will no longer work in the upcoming version 2.0. To ensure a smooth transition, users are advised to migrate their existing Flink configuration to the new configuration file as soon as possible.
This section will help users understand how to configure the Flink cluster and jobs through the
config.yaml
configuration file, as well as how to migrate old configuration to the new configuration file.
Starting from Flink-1.19, the default configuration file has been changed to
config.yaml
and placed in the
conf/
directory. Users should directly modify this file to configure Flink.
To continue using the legacy configuration file
flink-conf.yaml
, users just need to copy this file into the
conf/
directory. Once the legacy configuration file
flink-conf.yaml
is detected, Flink will prioritize using it as the configuration file.
The usage for
config.yaml
is as follows:
restart-strategy:
type: failure-rate
failure-rate:
delay: 1 s
failure-rate-interval: 1 min
max-failures-per-interval: 1
Users can also organize Config Keys in a flatten format, such as:
restart-strategy.type: failure-rate
restart-strategy.failure-rate.delay: 1 s
restart-strategy.failure-rate.failure-rate-interval: 1 min
restart-strategy.failure-rate.max-failures-per-interval: 1
Config Value
The config.yaml
configuration file allows users to configure values following the YAML 1.2 core schema.
Users can configure the values corresponding to the Config Type in the following format:
Float
Regular expression: [-+]? ( \. [0-9]+ | [0-9]+ ( \. [0-9]* )? ) ( [eE] [-+]? [0-9]+ )?
Example: 100.1
Double
Regular expression: [-+]? ( \. [0-9]+ | [0-9]+ ( \. [0-9]* )? ) ( [eE] [-+]? [0-9]+ )?
Example: 100.1
Boolean
Regular expression: true | True | TRUE | false | False | FALSE
Example: true
String
Any character
Note: If the value contains special characters used in
YAML,
it is necessary to use single or double quotes to escape these characters.
Map<String, String>
Flow Style:
Description: Enclosed with curly braces "{}" and pairs are separated by a comma ",". Within the mapping, key and value are separated by a colon ":" followed by a space.
Example: {k1: v1, k2: v2}
Block Style:
Description: Uses indentation to represent the hierarchical structure of the data, with keys and values separated by a colon ":" and a space.
Example:
k1: v1
k2: v2
Flink Legacy Map Pattern:
Description: Pairs are separated by a comma ",". Within the mapping, keys and values are separated by a colon ":".
Example: k1:v1,k2:v2
Note: For values containing special characters, consider escaping them; details can be found in the description of Config Type: String.
Flow Style:
Description: Enclosed within square brackets "[]" and list items are separated by a comma ",".
Example: [a, b, c]
Block Style:
Description: Uses indentation and a dash "-" to denote list items.
Example:
- a
- b
- c
Flink Legacy Map Pattern:
Description: List items are separated by a semicolon ";".
Example: a;b;c
Note: For values containing special characters, consider escaping them; details can be found in the description of Config Type: String.
MemorySize
Regular expression: [0-9]+ (b | kb | kibibytes | m | mb | mebibytes | g | gb | gibibytes | t | tb | tebibytes)?
Example: 100 mb
Duration
Regular expression: [0-9]+ (d | day | h | hour | m | min | minute | s | sec | second | ms | milli | millisecond | us | micro | microsecond | ns | nano | nanosecond)?
Example: 10 s
Enum Constants
Additionally, users can configure the value for all Config Types as strings by simply enclosing the original value in single quotes or double quotes.
Migrate from flink-conf.yaml to config.yaml
Behavior Changes
config.yaml
strictly follows the YAML 1.2 syntax and is compatible with flink-conf.yaml
in most cases, except for the following behavior changes:
Null value:
flink-conf.yaml
: Only supports leaving the value blank.
config.yaml
: Supports leaving it blank, or explicitly set it to null, Null, NULL, or ~
.
Comment:
flink-conf.yaml
: Everything after the first occurrence of #
in each line is considered a comment.
config.yaml
: When there is at least one space between the #
and the preceding content, or when the #
is at the beginning of a line, the subsequent content is considered a comment.
Special character escaping in strings:
flink-conf.yaml
: Only needs to escape elements in Lists and Maps.
List elements containing a semicolon “;” require escaping.
Map elements containing a comma “,” or colon “:” require escaping.
config.yaml
: Requires escaping special characters as defined in the YAML 1.2 specification; see the definition of special characters.
Duplicated keys:
flink-conf.yaml
: Allows duplicated keys and takes the last key-value pair for the corresponding key that appears in the file.
config.yaml
: Does not allow duplicated keys, and an error will be reported when loading the configuration.
Handling of invalid configuration:
flink-conf.yaml
: Invalid key-value pairs will be ignored.
config.yaml
: An error will be reported when loading the configuration.
To facilitate user migration, Flink provides a configuration file migration script that can automate the migration process. The usage is as follows:
Place the old configuration file flink-conf.yaml
in the conf/
directory.
Execute the following command in the $FLINK_HOME/
directory:
bin/migrate-config-file.sh
After running the command above, the migration script will automatically read the old configuration file flink-conf.yaml
from the conf/
directory and output the migrated results to the new configuration file config.yaml
in the conf/
directory. Note that due to the limitation of the legacy configuration parser, all values in flink-conf.yaml will be recognized as String type, so the values in the generated config.yaml file will also be of String type, which means some values will be enclosed in quotes. However, Flink will convert them to the actual types defined using ConfigOption
during subsequent configuration parsing.
Additionally, users need to delete the flink-conf.yaml
file in the conf/
directory after migration to make the config.yaml
file take effect.
Basic Setup
The default configuration supports starting a single-node Flink session cluster without any changes.
The options in this section are the ones most commonly needed for a basic distributed Flink setup.
Hostnames / Ports
These options are only necessary for standalone application- or session deployments (simple standalone or Kubernetes).
If you use Flink with Yarn or the active Kubernetes integration, the hostnames and ports are automatically discovered.
rest.address
, rest.port
: These are used by the client to connect to Flink. Set this to the hostname where the JobManager runs, or to the hostname of the (Kubernetes) service in front of the JobManager’s REST interface.
The jobmanager.rpc.address
(defaults to “localhost”) and jobmanager.rpc.port
(defaults to 6123) config entries are used by the TaskManager to connect to the JobManager/ResourceManager. Set this to the hostname where the JobManager runs, or to the hostname of the (Kubernetes internal) service for the JobManager. This option is ignored on setups with high-availability where the leader election mechanism is used to discover this automatically.
Memory Sizes
The default memory sizes support simple streaming/batch applications, but are too low to yield good performance for more complex applications.
jobmanager.memory.process.size
: Total size of the JobManager (JobMaster / ResourceManager / Dispatcher) process.
taskmanager.memory.process.size
: Total size of the TaskManager process.
The total sizes include everything. Flink will subtract some memory for the JVM’s own memory requirements (metaspace and others), and divide and configure the rest automatically between its components (JVM Heap, Off-Heap, for Task Managers also network, managed memory etc.).
These values are configured as memory sizes, for example 1536m or 2g.
Parallelism
taskmanager.numberOfTaskSlots
: The number of slots that a TaskManager offers (default: 1). Each slot can take one task or pipeline.
Having multiple slots in a TaskManager can help amortize certain constant overheads (of the JVM, application libraries, or network connections) across parallel tasks or pipelines. See the Task Slots and Resources concepts section for details.
Running more smaller TaskManagers with one slot each is a good starting point and leads to the best isolation between tasks. Dedicating the same resources to fewer larger TaskManagers with more slots can help to increase resource utilization, at the cost of weaker isolation between the tasks (more tasks share the same JVM).
parallelism.default
: The default parallelism used when no parallelism is specified anywhere (default: 1).
Checkpointing
You can configure checkpointing directly in code within your Flink job or application. Putting these values here in the configuration defines them as defaults in case the application does not configure anything.
state.backend.type
: The state backend to use. This defines the data structure mechanism for taking snapshots. Common values are hashmap
or rocksdb
.
execution.checkpointing.dir
: The directory to write checkpoints to. This takes a path URI like s3://mybucket/flink-app/checkpoints or hdfs://namenode:port/flink/checkpoints.
execution.checkpointing.savepoint-dir
: The default directory for savepoints. Takes a path URI, similar to execution.checkpointing.dir
.
execution.checkpointing.interval
: The base interval setting. To enable checkpointing, you need to set this value larger than 0.
Web UI
web.submit.enable
: Enables uploading and starting jobs through the Flink UI (true by default). Please note that even when this is disabled, session clusters still accept jobs through REST requests (HTTP calls). This flag only guards the feature to upload jobs in the UI.
web.cancel.enable
: Enables canceling jobs through the Flink UI (true by default). Please note that even when this is disabled, session clusters still cancel jobs through REST requests (HTTP calls). This flag only guards the feature to cancel jobs in the UI.
web.upload.dir
: The directory where to store uploaded jobs. Only used when web.submit.enable
is true.
web.exception-history-size
: Sets the size of the exception history that prints the most recent failures that were handled by Flink for a job.
Other
io.tmp.dirs
: The directories where Flink puts local data, defaults to the system temp directory (java.io.tmpdir
property). If a list of directories is configured, Flink will rotate files across the directories.
The data put in these directories include by default the files created by RocksDB, spilled intermediate results (batch algorithms), and cached jar files.
This data is NOT relied upon for persistence/recovery, but if this data gets deleted, it typically causes a heavyweight recovery operation. It is hence recommended to set this to a directory that is not automatically periodically purged.
Yarn and Kubernetes setups automatically configure this value to the local working directories by default.
Options to configure hostnames and ports for the different Flink components.
The JobManager hostname and port are only relevant for standalone setups without high-availability.
In that setup, the config values are used by the TaskManagers to find (and connect to) the JobManager.
In all highly-available setups, the TaskManagers discover the JobManager via the High-Availability-Service (for example ZooKeeper).
Setups using resource orchestration frameworks (K8s, Yarn) typically use the framework’s service discovery facilities.
You do not need to configure any TaskManager hosts and ports, unless the setup requires the use of specific port ranges or specific network interfaces to bind to.
Default
Description
(none)
String
The local address of the network interface that the job manager binds to. If not configured, '0.0.0.0' will be used.
jobmanager.rpc.address
(none)
String
The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobmanager.rpc.bind-port
(none)
Integer
The local RPC port that the JobManager binds to. If not configured, the external port (configured by 'jobmanager.rpc.port') will be used.
jobmanager.rpc.port
Integer
The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
metrics.internal.query-service.port
String
The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.
rest.address
(none)
String
The address that should be used by clients to connect to the server. Attention: This option is respected only if the high-availability configuration is NONE.
rest.bind-address
(none)
String
The address that the server binds itself.
rest.bind-port
"8081"
String
The port that the server binds itself. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Rest servers are running on the same machine.
rest.path
(none)
String
The path that should be used by clients to interact to the server which is accessible via URL.
rest.port
Integer
The port that the client connects to. If rest.bind-port has not been specified, then the REST server will bind to this port. Attention: This option is respected only if the high-availability configuration is NONE.
taskmanager.bind-host
(none)
String
The local address of the network interface that the task manager binds to. If not configured, '0.0.0.0' will be used.
taskmanager.collect-sink.port
Integer
The port used for the client to retrieve query results from the TaskManager. The default value is 0, which corresponds to a random port assignment.
taskmanager.data.bind-port
(none)
String
The task manager's bind port used for data exchange operations. Also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. If not configured, 'taskmanager.data.port' will be used.
taskmanager.data.port
Integer
The task manager’s external port used for data exchange operations.
taskmanager.host
(none)
String
The external address of the network interface where the TaskManager is exposed. Because different TaskManagers need different values for this option, usually it is specified in an additional non-shared TaskManager-specific config file.
taskmanager.rpc.bind-port
(none)
Integer
The local RPC port that the TaskManager binds to. If not configured, the external port (configured by 'taskmanager.rpc.port') will be used.
taskmanager.rpc.port
String
The external RPC port where the TaskManager is exposed. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.
These configuration options control Flink’s restart behaviour in case of failures during the execution.
By configuring these options in your flink-conf.yaml
, you define the cluster’s default restart strategy.
The default restart strategy will only take effect if no job specific restart strategy has been configured via the ExecutionConfig
.
Default
Description
(none)
String
Defines the restart strategy to use in case of job failures.
Accepted values are:disable
, off
, none
: No restart strategy.fixed-delay
, fixeddelay
: Fixed delay restart strategy. More details can be found here.failure-rate
, failurerate
: Failure rate restart strategy. More details can be found here.exponential-delay
, exponentialdelay
: Exponential delay restart strategy. More details can be found here.
If checkpointing is disabled, the default value is disable
. If checkpointing is enabled, the default value is exponential-delay
, and the default values of exponential-delay
related config options will be used.
Integer
The number of times that Flink retries the execution before the job is declared as failed if restart-strategy.type
has been set to fixed-delay
.
restart-strategy.fixed-delay.delay
Duration
Delay between two consecutive restart attempts if restart-strategy.type
has been set to fixed-delay
. Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted. It can be specified using notation: "1 min", "20 s"
infinite
Integer
The number of times that Flink retries the execution before failing the job if restart-strategy.type
has been set to exponential-delay
. The number will be reset once the backoff is reset to its initial value.
restart-strategy.exponential-delay.backoff-multiplier
Double
Backoff value is multiplied by this value after every failure,until max backoff is reached if restart-strategy.type
has been set to exponential-delay
.
restart-strategy.exponential-delay.initial-backoff
Duration
Starting duration between restarts if restart-strategy.type
has been set to exponential-delay
. It can be specified using notation: "1 min", "20 s"
restart-strategy.exponential-delay.jitter-factor
Double
Jitter specified as a portion of the backoff if restart-strategy.type
has been set to exponential-delay
. It represents how large random value will be added or subtracted to the backoff. Useful when you want to avoid restarting multiple jobs at the same time.
restart-strategy.exponential-delay.max-backoff
1 min
Duration
The highest possible duration between restarts if restart-strategy.type
has been set to exponential-delay
. It can be specified using notation: "1 min", "20 s"
restart-strategy.exponential-delay.reset-backoff-threshold
Duration
Threshold when the backoff is reset to its initial value if restart-strategy.type
has been set to exponential-delay
. It specifies how long the job must be running without failure to reset the exponentially increasing backoff to its initial value. It can be specified using notation: "1 min", "20 s"
Duration
Delay between two consecutive restart attempts if restart-strategy.type
has been set to failure-rate
. It can be specified using notation: "1 min", "20 s"
restart-strategy.failure-rate.failure-rate-interval
1 min
Duration
Time interval for measuring failure rate if restart-strategy.type
has been set to failure-rate
. It can be specified using notation: "1 min", "20 s"
restart-strategy.failure-rate.max-failures-per-interval
Integer
Maximum number of restarts in given time interval before failing a job if restart-strategy.type
has been set to failure-rate
.
"exponential-delay"
String
Defines the cleanup strategy to use in case of cleanup failures.
Accepted values are:none
, disable
, off
: Cleanup is only performed once. No retry will be initiated in case of failure. The job artifacts (and the job's JobResultStore entry) have to be cleaned up manually in case of a failure.fixed-delay
, fixeddelay
: Cleanup attempts will be separated by a fixed interval up to the point where the cleanup is considered successful or a set amount of retries is reached. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually.exponential-delay
, exponentialdelay
: Exponential delay restart strategy triggers the cleanup with an exponentially increasing delay up to the point where the cleanup succeeded or a set amount of retries is reached. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually.
The default configuration relies on an exponentially delayed retry strategy with the given default values.
infinite
Integer
The number of times that Flink retries the cleanup before giving up if cleanup-strategy.type
has been set to fixed-delay
. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually.
cleanup-strategy.fixed-delay.delay
1 min
Duration
Amount of time that Flink waits before re-triggering the cleanup after a failed attempt if the cleanup-strategy.type
is set to fixed-delay
. It can be specified using the following notation: "1 min", "20 s"
infinite
Integer
The number of times a failed cleanup is retried if cleanup-strategy.type
has been set to exponential-delay
. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually.
cleanup-strategy.exponential-delay.initial-backoff
Duration
Starting duration between cleanup retries if cleanup-strategy.type
has been set to exponential-delay
. It can be specified using the following notation: "1 min", "20 s"
cleanup-strategy.exponential-delay.max-backoff
Duration
The highest possible duration between cleanup retries if cleanup-strategy.type
has been set to exponential-delay
. It can be specified using the following notation: "1 min", "20 s"
These options control the basic setup of state backends and checkpointing behavior.
The options are only relevant for jobs/applications executing in a continuous streaming fashion.
Jobs/applications executing in a batch fashion do not use state backends and checkpoints, but different internal data structures that are optimized for batch processing.
State Backends
Default
Description
"hashmap"
String
The state backend to be used to store state.
The implementation can be specified either via their shortcut name, or via the class name of a StateBackendFactory
. If a factory is specified it is instantiated via its zero argument constructor and its StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)
method is called.
Recognized shortcut names are 'hashmap' and 'rocksdb'.
(none)
String
The checkpoint storage implementation to be used to checkpoint state.
The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory
. If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)
method is called.
Recognized shortcut names are 'jobmanager' and 'filesystem'.
'execution.checkpointing.storage' and 'execution.checkpointing.dir' are usually combined to configure the checkpoint location. By default, the checkpoint meta data and actual program state will be stored in the JobManager's memory directly. When 'execution.checkpointing.storage' is set to 'jobmanager', if 'execution.checkpointing.dir' is configured, the meta data of checkpoints will be persisted to the path specified by 'execution.checkpointing.dir'. Otherwise, the meta data will be stored in the JobManager's memory. When 'execution.checkpointing.storage' is set to 'filesystem', a valid path must be configured to 'execution.checkpointing.dir', and the checkpoint meta data and actual program state will both be persisted to the path.
execution.checkpointing.dir
(none)
String
The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'execution.checkpointing.storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory.
execution.checkpointing.savepoint-dir
(none)
String
The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).
execution.checkpointing.cleaner.parallel-mode
Boolean
Option whether to discard a checkpoint's states in parallel using the ExecutorService passed into the cleaner
execution.checkpointing.incremental
false
Boolean
Option whether to create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option.
execution.checkpointing.local-backup.dirs
(none)
String
The config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. If not configured it will default to <WORKING_DIR>/localState. The <WORKING_DIR> can be configured via process.taskmanager.working-dir
execution.checkpointing.local-backup.enabled
false
Boolean
This option configures local backup for the state backend, which indicates whether to make backup checkpoint on local disk. If not configured, fallback to execution.state-recovery.from-local. By default, local backup is deactivated. Local backup currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).
execution.checkpointing.num-retained
Integer
The maximum number of completed checkpoints to retain.
High-availability here refers to the ability of the JobManager process to recover from failures.
The JobManager ensures consistency during recovery across TaskManagers. For the JobManager itself to recover consistently, an external service must store a minimal amount of recovery metadata (like “ID of last committed checkpoint”), as well as help to elect and lock which JobManager is the leader (to avoid split-brain situations).
Default
Description
"NONE"
String
Defines high-availability mode used for cluster execution. To enable high-availability, set this mode to "ZOOKEEPER", "KUBERNETES", or specify the fully qualified name of the factory class.
high-availability.cluster-id
"/default"
String
The ID of the Flink cluster, used to separate multiple Flink clusters from each other. Needs to be set for standalone clusters but is automatically inferred in YARN.
high-availability.storageDir
(none)
String
File system path (URI) where Flink persists metadata in high-availability setups.
Boolean
Determines whether job results should be automatically removed from the underlying job result store when the corresponding entity transitions into a clean state. If false, the cleaned job results are, instead, marked as clean to indicate their state. In this case, Flink no longer has ownership and the resources need to be cleaned up by the user.
job-result-store.storage-path
(none)
String
Defines where job results should be stored. This should be an underlying file-system that provides read-after-write consistency. By default, this is {high-availability.storageDir}/job-result-store/{high-availability.cluster-id}
.
These configuration values control the way that TaskManagers and JobManagers use memory.
Flink tries to shield users as much as possible from the complexity of configuring the JVM for data-intensive processing.
In most cases, users should only need to set the values taskmanager.memory.process.size
or taskmanager.memory.flink.size
(depending on how the setup), and possibly adjusting the ratio of JVM heap and Managed Memory via taskmanager.memory.managed.fraction
. The other options below can be used for performance tuning and fixing memory related errors.
For a detailed explanation of how these options interact,
see the documentation on TaskManager and
JobManager memory configurations.
Default
Description
false
Boolean
Whether to enable the JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize). The limit will be set to the value of 'jobmanager.memory.off-heap.size' option.
jobmanager.memory.flink.size
(none)
MemorySize
Total Flink Memory size for the JobManager. This includes all the memory that a JobManager consumes, except for JVM Metaspace and JVM Overhead. It consists of JVM Heap Memory and Off-heap Memory. See also 'jobmanager.memory.process.size' for total process memory size configuration.
jobmanager.memory.heap.size
(none)
MemorySize
JVM Heap Memory size for JobManager. The minimum recommended JVM Heap size is 128.000mb (134217728 bytes).
jobmanager.memory.jvm-metaspace.size
256 mb
MemorySize
JVM Metaspace Size for the JobManager.
jobmanager.memory.jvm-overhead.fraction
Float
Fraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
jobmanager.memory.jvm-overhead.max
MemorySize
Max JVM Overhead size for the JobManager. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
jobmanager.memory.jvm-overhead.min
192 mb
MemorySize
Min JVM Overhead size for the JobManager. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
jobmanager.memory.off-heap.size
128 mb
MemorySize
Off-heap Memory size for JobManager. This option covers all off-heap memory usage including direct and native memory allocation. The JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize) will be set to this value if the limit is enabled by 'jobmanager.memory.enable-jvm-direct-memory-limit'.
jobmanager.memory.process.size
(none)
MemorySize
Total Process Memory size for the JobManager. This includes all the memory that a JobManager JVM process consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. In containerized setups, this should be set to the container memory. See also 'jobmanager.memory.flink.size' for Total Flink Memory size configuration.
taskmanager.memory.flink.size
(none)
MemorySize
Total Flink Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory, Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Network Memory. See also 'taskmanager.memory.process.size' for total process memory size configuration.
taskmanager.memory.framework.heap.size
128 mb
MemorySize
Framework Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for TaskExecutor framework, which will not be allocated to task slots.
taskmanager.memory.framework.off-heap.batch-shuffle.size
64 mb
MemorySize
Size of memory used by batch shuffle for shuffle data read (currently only used by sort-shuffle and hybrid shuffle). Notes: 1) The memory is cut from 'taskmanager.memory.framework.off-heap.size' so must be smaller than that, which means you may also need to increase 'taskmanager.memory.framework.off-heap.size' after you increase this config value; 2) This memory size can influence the shuffle performance and you can increase this config value for large-scale batch jobs (for example, to 128M or 256M).
taskmanager.memory.framework.off-heap.size
128 mb
MemorySize
Framework Off-Heap Memory size for TaskExecutors. This is the size of off-heap memory (JVM direct memory and native memory) reserved for TaskExecutor framework, which will not be allocated to task slots. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.
taskmanager.memory.jvm-metaspace.size
256 mb
MemorySize
JVM Metaspace Size for the TaskExecutors.
taskmanager.memory.jvm-overhead.fraction
Float
Fraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.jvm-overhead.max
MemorySize
Max JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.jvm-overhead.min
192 mb
MemorySize
Min JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.managed.consumer-weights
OPERATOR:70,STATE_BACKEND:70,PYTHON:30
Managed memory weights for different kinds of consumers. A slot’s managed memory is shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless of the number of consumers from each kind. Currently supported kinds of consumers are OPERATOR (for built-in algorithms), STATE_BACKEND (for RocksDB state backend) and PYTHON (for Python processes).
taskmanager.memory.managed.fraction
Float
Fraction of Total Flink Memory to be used as Managed Memory, if Managed Memory size is not explicitly specified.
taskmanager.memory.managed.size
(none)
MemorySize
Managed Memory size for TaskExecutors. This is the size of off-heap memory managed by the memory manager, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend. Memory consumers can either allocate memory from the memory manager in the form of MemorySegments, or reserve bytes from the memory manager and keep their memory usage within that boundary. If unspecified, it will be derived to make up the configured fraction of the Total Flink Memory.
taskmanager.memory.network.fraction
Float
Fraction of Total Flink Memory to be used as Network Memory. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.network.max
infinite
MemorySize
Max Network Memory size for TaskExecutors. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. By default, the max limit of Network Memory is Long.MAX_VALUE. The exact size of Network Memory can be explicitly specified by setting the min/max to the same value.
taskmanager.memory.network.min
64 mb
MemorySize
Min Network Memory size for TaskExecutors. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max to the same value.
taskmanager.memory.process.size
(none)
MemorySize
Total Process Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On containerized setups, this should be set to the container memory. See also 'taskmanager.memory.flink.size' for total Flink memory size configuration.
taskmanager.memory.task.heap.size
(none)
MemorySize
Task Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for tasks. If not specified, it will be derived as Total Flink Memory minus Framework Heap Memory, Framework Off-Heap Memory, Task Off-Heap Memory, Managed Memory and Network Memory.
taskmanager.memory.task.off-heap.size
0 bytes
MemorySize
Task Off-Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory and native memory) reserved for tasks. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.
(none)
String
A (semicolon-separated) list of file schemes, for which Hadoop can be used instead of an appropriate Flink plugin. (example: s3;wasb)
fs.default-scheme
(none)
String
The default filesystem scheme, used for paths that do not declare a scheme explicitly. May contain an authority, e.g. host:port in case of an HDFS NameNode.
io.tmp.dirs
'LOCAL_DIRS' on Yarn. System.getProperty("java.io.tmpdir") in standalone.
String
Directories for temporary files, separated by",", "|", or the system's java.io.File.pathSeparator.
Options for configuring Flink’s security and secure interaction with external systems.
Flink’s network connections can be secured via SSL. Please refer to the SSL Setup Docs for detailed setup guide and background.
Default
Description
"TLS_RSA_WITH_AES_128_CBC_SHA"
String
The comma separated list of standard SSL algorithms to be supported. Read more here
security.ssl.internal.cert.fingerprint
(none)
String
The sha1 fingerprint of the internal certificate. This further protects the internal communication to present the exact certificate used by Flink.This is necessary where one cannot use private CA(self signed) or there is internal firm wide CA is required
security.ssl.internal.enabled
false
Boolean
Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).
security.ssl.internal.key-password
(none)
String
The secret to decrypt the key in the keystore for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.keystore
(none)
String
The Java keystore file with SSL Key and Certificate, to be used Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.keystore-password
(none)
String
The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.keystore-type
JVM default keystore type
String
The type of keystore for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.truststore
(none)
String
The truststore file containing the public CA certificates to verify the peer for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.truststore-password
(none)
String
The password to decrypt the truststore for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.truststore-type
JVM default keystore type
String
The type of truststore for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.protocol
"TLSv1.2"
String
The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.
security.ssl.rest.authentication-enabled
false
Boolean
Turns on mutual SSL authentication for external communication via the REST endpoints.
security.ssl.rest.cert.fingerprint
(none)
String
The sha1 fingerprint of the rest certificate. This further protects the rest REST endpoints to present certificate which is only used by proxy serverThis is necessary where once uses public CA or internal firm wide CA
security.ssl.rest.enabled
false
Boolean
Turns on SSL for external communication via the REST endpoints.
security.ssl.rest.key-password
(none)
String
The secret to decrypt the key in the keystore for Flink's external REST endpoints.
security.ssl.rest.keystore
(none)
String
The Java keystore file with SSL Key and Certificate, to be used Flink's external REST endpoints.
security.ssl.rest.keystore-password
(none)
String
The secret to decrypt the keystore file for Flink's for Flink's external REST endpoints.
security.ssl.rest.keystore-type
JVM default keystore type
String
The type of the keystore for Flink's external REST endpoints.
security.ssl.rest.truststore
(none)
String
The truststore file containing the public CA certificates to verify the peer for Flink's external REST endpoints.
security.ssl.rest.truststore-password
(none)
String
The password to decrypt the truststore for Flink's external REST endpoints.
security.ssl.rest.truststore-type
JVM default keystore type
String
The type of the truststore for Flink's external REST endpoints.
security.ssl.verify-hostname
Boolean
Flag to enable peer’s hostname verification during ssl handshake.
Delegation token
Flink has a pluggable authentication protocol agnostic delegation token framework.
Please refer to the Flink and Delegation Token Docs for further details.
Default
Description
Duration
The time period how long to wait before retrying to obtain new delegation tokens after a failure.
security.delegation.tokens.renewal.time-ratio
Double
Ratio of the tokens's expiration time when new credentials should be re-obtained.
security.delegation.token.provider.<serviceName>.enabled
Boolean
Controls whether to obtain credentials for services when security is enabled. By default, credentials for all supported services are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run.
(none)
List<String>
A semicolon-separated list of Kerberos-secured Hadoop filesystems Flink is going to access. For example, security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002;hdfs://namenode3:9003. The JobManager needs to have access to these filesystems to retrieve the security tokens.
security.kerberos.login.contexts
(none)
String
A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication)
security.kerberos.login.keytab
(none)
String
Absolute path to a Kerberos keytab file that contains the user credentials.
security.kerberos.login.principal
(none)
String
Kerberos principal name associated with the keytab.
security.kerberos.login.use-ticket-cache
Boolean
Indicates whether to read from your Kerberos ticket cache.
security.kerberos.relogin.period
1 min
Duration
The time period when keytab login happens automatically in order to always have a valid TGT.
This section contains options related to integrating Flink with resource orchestration frameworks, like Kubernetes, Yarn, etc.
Note that is not always necessary to integrate Flink with the resource orchestration framework.
For example, you can easily deploy Flink applications on Kubernetes without Flink knowing that it runs on Kubernetes (and without specifying any of the Kubernetes config options here.) See this setup guide for an example.
The options in this section are necessary for setups where Flink itself actively requests and releases resources from the orchestrators.
(none)
String
If configured, Flink will add this key to the resource profile of container request to Yarn. The value will be set to the value of external-resource.<resource_name>.amount.
flink.hadoop.<key>
(none)
String
A general option to probe Hadoop configuration through prefix 'flink.hadoop.'. Flink will remove the prefix to get <key> (from core-default.xml and hdfs-default.xml) then set the <key> and value to Hadoop configuration. For example, flink.hadoop.dfs.replication=5 in Flink configuration and convert to dfs.replication=5 in Hadoop configuration.
flink.yarn.<key>
(none)
String
A general option to probe Yarn configuration through prefix 'flink.yarn.'. Flink will remove the prefix 'flink.' to get yarn.<key> (from yarn-default.xml) then set the yarn.<key> and value to Yarn configuration. For example, flink.yarn.resourcemanager.container.liveness-monitor.interval-ms=300000 in Flink configuration and convert to yarn.resourcemanager.container.liveness-monitor.interval-ms=300000 in Yarn configuration.
yarn.application-attempt-failures-validity-interval
10000
Time window in milliseconds which defines the number of application attempt failures when restarting the AM. Failures which fall outside of this window are not being considered. Set this value to -1 in order to count globally. See here for more information.
yarn.application-attempts
(none)
Integer
Number of ApplicationMaster restarts. By default, the value will be set to 1. If high availability is enabled, then the default value will be 2. The restart number is also limited by YARN (configured via yarn.resourcemanager.am.max-attempts). Note that the entire Flink cluster will restart and the YARN Client will lose the connection.
yarn.application-master.port
String
With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
yarn.application.id
(none)
String
The YARN application id of the running yarn cluster. This is the YARN cluster where the pipeline is going to be executed.
yarn.application.name
(none)
String
A custom name for your YARN application.
yarn.application.node-label
(none)
String
Specify YARN node label for the YARN application.
yarn.application.priority
Integer
A non-negative integer indicating the priority for submitting a Flink YARN application. It will only take effect if YARN priority scheduling setting is enabled. Larger integer corresponds with higher priority. If priority is negative or set to '-1'(default), Flink will unset yarn priority setting and use cluster default priority. Please refer to YARN's official documentation for specific settings required to enable priority scheduling for the targeted YARN version.
yarn.application.queue
(none)
String
The YARN queue on which to put the current pipeline.
yarn.application.type
(none)
String
A custom type for your YARN application..
yarn.appmaster.vcores
Integer
The number of virtual cores (vcores) used by YARN application master.
yarn.classpath.include-user-jar
ORDER
Defines whether user-jars are included in the system class path as well as their positioning in the path.
Possible values:- "DISABLED": Exclude user jars from the system class path
- "FIRST": Position at the beginning
- "LAST": Position at the end
- "ORDER": Position based on the name of the jar
yarn.container-start-command-template
"%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%"
String
This configuration parameter allows users to pass custom settings (such as JVM paths, arguments etc.) to start the YARN. The following placeholders will be replaced: - %java%: Path to the Java executable
- %jvmmem%: JVM memory limits and tweaks
- %jvmopts%: Options for the Java VM
- %logging%: Logging-related configuration settings
- %class%: Main class to execute
- %args%: Arguments for the main class
- %redirects%: Output redirects
yarn.containers.vcores
Integer
The number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting the org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
.
yarn.file-replication
Integer
Number of file replication of each local resource file. If it is not configured, Flink will use the default replication value in hadoop configuration.
yarn.flink-dist-jar
(none)
String
The location of the Flink dist jar.
yarn.heartbeat.container-request-interval
500 ms
Duration
Time between heartbeats with the ResourceManager if Flink requests containers:- The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats.
- The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn.
If you observe too many container allocations on the ResourceManager, then it is recommended to increase this value. See this link for more information.
yarn.heartbeat.interval
Integer
Time between heartbeats with the ResourceManager in seconds.
yarn.modify.acls
(none)
String
Users and groups to give MODIFY access. The ACLs are of for comma-separated-users<space>comma-separated-groups. Wildcard ACL is also supported. The only valid wildcard ACL is *, which grants permission to all users and groups.
yarn.properties-file.location
(none)
String
When a Flink job is submitted to YARN, the JobManager’s host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users).
yarn.provided.lib.dirs
(none)
List<String>
A semicolon-separated list of provided lib directories. They should be pre-uploaded and world-readable. Flink will use them to exclude the local Flink jars(e.g. flink-dist, lib/, plugins/)uploading to accelerate the job submission process. Also YARN will cache them on the nodes so that they doesn't need to be downloaded every time for each application. An example could be hdfs://$namenode_address/path/of/flink/lib
yarn.provided.usrlib.dir
(none)
String
The provided usrlib directory in remote. It should be pre-uploaded and world-readable. Flink will use it to exclude the local usrlib directory(i.e. usrlib/ under the parent directory of FLINK_LIB_DIR). Unlike yarn.provided.lib.dirs, YARN will not cache it on the nodes as it is for each application. An example could be hdfs://$namenode_address/path/of/flink/usrlib
yarn.security.appmaster.delegation.token.services
"hadoopfs"
List<String>
The delegation token provider services are allowed to pass obtained tokens to YARN application master. For backward compatibility to make log aggregation to work, we add tokens obtained by `hadoopfs` provider to AM by default.
yarn.security.kerberos.localized-keytab-path
"krb5.keytab"
String
Local (on NodeManager) path where kerberos keytab file will be localized to. If yarn.security.kerberos.ship-local-keytab set to true, Flink will ship the keytab file as a YARN local resource. In this case, the path is relative to the local resource directory. If set to false, Flink will try to directly locate the keytab from the path itself.
yarn.security.kerberos.ship-local-keytab
Boolean
When this is true Flink will ship the keytab file configured via security.kerberos.login.keytab as a localized YARN resource.
yarn.ship-archives
(none)
List<String>
A semicolon-separated list of archives to be shipped to the YARN cluster. These archives can come from the local path of flink client or HDFS. They will be un-packed when localizing and they can be any of the following types: ".tar.gz", ".tar", ".tgz", ".dst", ".jar", ".zip". For example, "/path/to/local/archive.jar;hdfs://$namenode_address/path/to/archive.jar"
yarn.ship-files
(none)
List<String>
A semicolon-separated list of files and/or directories to be shipped to the YARN cluster. These files/directories can come from the local path of flink client or HDFS. For example, "/path/to/local/file;/path/to/local/directory;hdfs://$namenode_address/path/of/file;hdfs://$namenode_address/path/of/directory"
yarn.staging-directory
(none)
String
Staging directory used to store YARN files while submitting applications. Per default, it uses the home directory of the configured file system.
yarn.tags
(none)
String
A comma-separated list of tags to apply to the Flink YARN application.
yarn.taskmanager.node-label
(none)
String
Specify YARN node label for the Flink TaskManagers, it will override the yarn.application.node-label for TaskManagers if both are set.
yarn.view.acls
(none)
String
Users and groups to give VIEW access. The ACLs are of for comma-separated-users<space>comma-separated-groups. Wildcard ACL is also supported. The only valid wildcard ACL is *, which grants permission to all users and groups.
(none)
String
If configured, Flink will add "resources.limits.<config-key>" and "resources.requests.<config-key>" to the main container of TaskExecutor and set the value to the value of external-resource.<resource_name>.amount.
kubernetes.artifacts.local-upload-enabled
false
Boolean
Enables uploading 'local://' schemed artifacts to DFS before the the application cluster deployment.
kubernetes.artifacts.local-upload-overwrite
false
Boolean
If enabled, overwrites any existing artifact on the remote target. Disabled by default.
kubernetes.artifacts.local-upload-target
(none)
String
The target remote DFS directory to upload local artifacts.
kubernetes.client.io-pool.size
Integer
The size of the IO executor pool used by the Kubernetes client to execute blocking IO operations (e.g. start/stop TaskManager pods, update leader related ConfigMaps, etc.). Increasing the pool size allows to run more IO operations concurrently.
kubernetes.client.user-agent
"flink"
String
The user agent to be used for contacting with Kubernetes APIServer.
kubernetes.cluster-id
(none)
String
The cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. The id must only contain lowercase alphanumeric characters and "-". The required format is [a-z]([-a-z0-9]*[a-z0-9])
. If not set, the client will automatically generate it with a random ID.
kubernetes.config.file
(none)
String
The kubernetes config file will be used to create the client. The default is located at ~/.kube/config
kubernetes.container.image.pull-policy
IfNotPresent
The Kubernetes container image pull policy. The default policy is IfNotPresent to avoid putting pressure to image repository.
Possible values:- "IfNotPresent"
- "Always"
- "Never"
kubernetes.container.image.pull-secrets
(none)
List<String>
A semicolon-separated list of the Kubernetes secrets used to access private image registries.
kubernetes.container.image.ref
The default value depends on the actually running version. In general it looks like "flink:<FLINK_VERSION>-scala_<SCALA_VERSION>"
String
Image to use for Flink containers. The specified image must be based upon the same Apache Flink and Scala versions as used by the application. Visit here for the official docker images provided by the Flink project. The Flink project also publishes docker images to apache/flink DockerHub repository.
kubernetes.context
(none)
String
The desired context from your Kubernetes config file used to configure the Kubernetes client for interacting with the cluster. This could be helpful if one has multiple contexts configured and wants to administrate different Flink clusters on different Kubernetes clusters/contexts.
kubernetes.decorator.hadoop-conf-mount.enabled
Boolean
Whether to enable Hadoop configuration mount decorator. This must be set to false when Hadoop config is mounted outside of Flink. A typical use-case is when one uses Flink Kubernetes Operator.
kubernetes.decorator.kerberos-mount.enabled
Boolean
Whether to enable Kerberos mount decorator. This must be set to false when Kerberos config and keytab is mounted outside of Flink. A typical use-case is when one uses Flink Kubernetes Operator.
kubernetes.entry.path
"/docker-entrypoint.sh"
String
The entrypoint script of kubernetes in the image. It will be used as command for jobmanager and taskmanager container.
kubernetes.env.secretKeyRef
(none)
List<Map>
The user-specified secrets to set env variables in Flink container. The value should be in the form of env:FOO_ENV,secret:foo_secret,key:foo_key;env:BAR_ENV,secret:bar_secret,key:bar_key
.
kubernetes.flink.conf.dir
"/opt/flink/conf"
String
The flink conf directory that will be mounted in pod. The config.yaml, log4j.properties, logback.xml in this path will be overwritten from config map.
kubernetes.flink.log.dir
(none)
String
The directory that logs of jobmanager and taskmanager be saved in the pod. The default value is $FLINK_HOME/log.
kubernetes.hadoop.conf.config-map.name
(none)
String
Specify the name of an existing ConfigMap that contains custom Hadoop configuration to be mounted on the JobManager(s) and TaskManagers.
kubernetes.hostnetwork.enabled
false
Boolean
Whether to enable HostNetwork mode. The HostNetwork allows the pod could use the node network namespace instead of the individual pod network namespace. Please note that the JobManager service account should have the permission to update Kubernetes service.
kubernetes.internal-service.annotations
(none)
The user-specified annotations that are set to the internal Service. The value should be in the form of a1:v1,a2:v2
kubernetes.jobmanager.annotations
(none)
The user-specified annotations that are set to the JobManager pod. The value could be in the form of a1:v1,a2:v2
kubernetes.jobmanager.cpu.amount
Double
The number of cpu used by job manager
kubernetes.jobmanager.cpu.limit-factor
Double
The limit factor of cpu used by job manager. The resources limit cpu will be set to cpu * limit-factor.
kubernetes.jobmanager.entrypoint.args
(none)
String
Extra arguments used when starting the job manager.
kubernetes.jobmanager.labels
(none)
The labels to be set for JobManager pod. Specified as key:value pairs separated by commas. For example, version:alphav1,deploy:test.
kubernetes.jobmanager.memory.limit-factor
Double
The limit factor of memory used by job manager. The resources limit memory will be set to memory * limit-factor.
kubernetes.jobmanager.node-selector
(none)
The node selector to be set for JobManager pod. Specified as key:value pairs separated by commas. For example, environment:production,disk:ssd.
kubernetes.jobmanager.owner.reference
(none)
List<Map>
The user-specified Owner References to be set to the JobManager Deployment. When all the owner resources are deleted, the JobManager Deployment will be deleted automatically, which also deletes all the resources created by this Flink cluster. The value should be formatted as a semicolon-separated list of owner references, where each owner reference is a comma-separated list of `key:value` pairs. E.g., apiVersion:v1,blockOwnerDeletion:true,controller:true,kind:FlinkApplication,name:flink-app-name,uid:flink-app-uid;apiVersion:v1,kind:Deployment,name:deploy-name,uid:deploy-uid
kubernetes.jobmanager.replicas
Integer
Specify how many JobManager pods will be started simultaneously. Configure the value to greater than 1 to start standby JobManagers. It will help to achieve faster recovery. Notice that high availability should be enabled when starting standby JobManagers.
kubernetes.jobmanager.service-account
"default"
String
Service account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server. If not explicitly configured, config option 'kubernetes.service-account' will be used.
kubernetes.jobmanager.tolerations
(none)
List<Map>
The user-specified tolerations to be set to the JobManager pod. The value should be in the form of key:key1,operator:Equal,value:value1,effect:NoSchedule;key:key2,operator:Exists,effect:NoExecute,tolerationSeconds:6000
kubernetes.namespace
"default"
String
The namespace that will be used for running the jobmanager and taskmanager pods.
kubernetes.pod-template-file.default
(none)
String
Specify a local file that contains the pod template definition. It will be used to initialize the jobmanager and taskmanager pod. The main container should be defined with name 'flink-main-container'. Notice that this can be overwritten by config options 'kubernetes.pod-template-file.jobmanager' and 'kubernetes.pod-template-file.taskmanager' for jobmanager and taskmanager respectively.
kubernetes.pod-template-file.jobmanager
(none)
String
Specify a local file that contains the jobmanager pod template definition. It will be used to initialize the jobmanager pod. The main container should be defined with name 'flink-main-container'. If not explicitly configured, config option 'kubernetes.pod-template-file.default' will be used.
kubernetes.pod-template-file.taskmanager
(none)
String
Specify a local file that contains the taskmanager pod template definition. It will be used to initialize the taskmanager pod. The main container should be defined with name 'flink-main-container'. If not explicitly configured, config option 'kubernetes.pod-template-file.default' will be used.
kubernetes.rest-service.annotations
(none)
The user-specified annotations that are set to the rest Service. The value should be in the form of a1:v1,a2:v2
kubernetes.rest-service.exposed.node-port-address-type
InternalIP
The user-specified address type that is used for filtering node IPs when constructing a node port connection string. This option is only considered when 'kubernetes.rest-service.exposed.type' is set to 'NodePort'.
Possible values:- "InternalIP"
- "ExternalIP"
kubernetes.rest-service.exposed.type
ClusterIP
The exposed type of the rest service. The exposed rest service could be used to access the Flink’s Web UI and REST endpoint.
Possible values:- "ClusterIP"
- "NodePort"
- "LoadBalancer"
- "Headless_ClusterIP"
kubernetes.secrets
(none)
The user-specified secrets that will be mounted into Flink container. The value should be in the form of foo:/opt/secrets-foo,bar:/opt/secrets-bar
.
kubernetes.service-account
"default"
String
Service account that is used by jobmanager and taskmanager within kubernetes cluster. Notice that this can be overwritten by config options 'kubernetes.jobmanager.service-account' and 'kubernetes.taskmanager.service-account' for jobmanager and taskmanager respectively.
kubernetes.taskmanager.annotations
(none)
The user-specified annotations that are set to the TaskManager pod. The value could be in the form of a1:v1,a2:v2
kubernetes.taskmanager.cpu.amount
Double
The number of cpu used by task manager. By default, the cpu is set to the number of slots per TaskManager
kubernetes.taskmanager.cpu.limit-factor
Double
The limit factor of cpu used by task manager. The resources limit cpu will be set to cpu * limit-factor.
kubernetes.taskmanager.entrypoint.args
(none)
String
Extra arguments used when starting the task manager.
kubernetes.taskmanager.labels
(none)
The labels to be set for TaskManager pods. Specified as key:value pairs separated by commas. For example, version:alphav1,deploy:test.
kubernetes.taskmanager.memory.limit-factor
Double
The limit factor of memory used by task manager. The resources limit memory will be set to memory * limit-factor.
kubernetes.taskmanager.node-selector
(none)
The node selector to be set for TaskManager pods. Specified as key:value pairs separated by commas. For example, environment:production,disk:ssd.
kubernetes.taskmanager.service-account
"default"
String
Service account that is used by taskmanager within kubernetes cluster. The task manager uses this service account when watching config maps on the API server to retrieve leader address of jobmanager and resourcemanager. If not explicitly configured, config option 'kubernetes.service-account' will be used.
kubernetes.taskmanager.tolerations
(none)
List<Map>
The user-specified tolerations to be set to the TaskManager pod. The value should be in the form of key:key1,operator:Equal,value:value1,effect:NoSchedule;key:key2,operator:Exists,effect:NoExecute,tolerationSeconds:6000
kubernetes.transactional-operation.initial-retry-delay
50 ms
Duration
Defines the initial duration of Kubernetes transactional operation retries after fail
kubernetes.transactional-operation.max-retries
Integer
Defines the number of Kubernetes transactional operation retries before the client gives up. For example, FlinkKubeClient#checkAndUpdateConfigMap
.
kubernetes.transactional-operation.max-retry-delay
1 min
Duration
Defines the max duration of Kubernetes transactional operation retries after fail
Please refer to the State Backend Documentation for background on State Backends.
RocksDB State Backend
These are the options commonly needed to configure the RocksDB state backend. See the Advanced RocksDB Backend Section for options necessary for advanced low level configurations and trouble-shooting.
Default
Description
(none)
MemorySize
The fixed total amount of memory, shared among all RocksDB instances per slot. This option overrides the 'state.backend.rocksdb.memory.managed' option when configured.
state.backend.rocksdb.memory.fixed-per-tm
(none)
MemorySize
The fixed total amount of memory, shared among all RocksDB instances per Task Manager (cluster-level option). This option only takes effect if neither 'state.backend.rocksdb.memory.managed' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is configured then each RocksDB column family state has its own memory caches (as controlled by the column family options). The relevant options for the shared resources (e.g. write-buffer-ratio) can be set on the same level (config.yaml).Note, that this feature breaks resource isolation between the slots
state.backend.rocksdb.memory.high-prio-pool-ratio
Double
The fraction of cache memory that is reserved for high-priority data like index, filter, and compression dictionary blocks. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.
state.backend.rocksdb.memory.managed
Boolean
If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.
state.backend.rocksdb.memory.partitioned-index-filters
false
Boolean
With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. The partitioned index/filter then uses the top-level index to load on demand into the block cache the partitions that are required to perform the index/filter query. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.
state.backend.rocksdb.memory.write-buffer-ratio
Double
The maximum amount of memory that write buffers may take, as a fraction of the total shared memory. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.
state.backend.rocksdb.timer-service.cache-size
Integer
The cache size per keyGroup of rocksdb timer service factory. This option only has an effect when 'state.backend.rocksdb.timer-service.factory' is configured to 'ROCKSDB'. Increasing this value can improve the performance of rocksdb timer service, but consumes more heap memory at the same time.
state.backend.rocksdb.timer-service.factory
ROCKSDB
This determines the factory for timer service state implementation.
Possible values:- "HEAP": Heap-based
- "ROCKSDB": Implementation based on RocksDB
Duration
Update interval for the metric fetcher used by the web UI. Decrease this value for faster updating metrics. Increase this value if the metric fetcher causes too much load. Setting this value to 0 disables the metric fetching completely.
metrics.internal.query-service.port
String
The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.
metrics.internal.query-service.thread-priority
Integer
The thread priority used for Flink's internal metric query service. The thread is created by Pekko's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down.
metrics.job.status.enable
CURRENT_TIME
List<Enum>
The selection of job status metrics that should be reported.
Possible values:- "STATE": For a given state, return 1 if the job is currently in that state, otherwise return 0.
- "CURRENT_TIME": For a given state, if the job is currently in that state, return the time since the job transitioned into that state, otherwise return 0.
- "TOTAL_TIME": For a given state, return how much time the job has spent in that state in total.
metrics.latency.granularity
"operator"
String
Defines the granularity of latency metrics. Accepted values are:- single - Track latency without differentiating between sources and subtasks.
- operator - Track latency while differentiating between sources, but not subtasks.
- subtask - Track latency while differentiating between sources and subtasks.
metrics.latency.history-size
Integer
Defines the number of measured latencies to maintain at each operator.
metrics.latency.interval
Duration
Defines the interval at which latency tracking marks are emitted from the sources. Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly impact the performance of the cluster.
metrics.reporter.<name>.<parameter>
(none)
String
Configures the parameter <parameter> for the reporter named <name>.
metrics.reporter.<name>.factory.class
(none)
String
The reporter factory class to use for the reporter named <name>.
metrics.reporter.<name>.filter.excludes
List<String>
The metrics that should be excluded for the reporter named <name>. The format is identical to filter.includes
metrics.reporter.<name>.filter.includes
"*:*:*"
List<String>
The metrics that should be included for the reporter named <name>. Filters are specified as a list, with each filter following this format:
<scope>[:<name>[,<name>][:<type>[,<type>]]]
A metric matches a filter if the scope pattern and at least one of the name patterns and at least one of the types match.
- scope: Filters based on the logical scope.
Specified as a pattern where *
matches any sequence of characters and .
separates scope components.
For example:
"jobmanager.job
" matches any job-related metrics on the JobManager,
"*.job
" matches all job-related metrics and
"*.job.*
" matches all metrics below the job-level (i.e., task/operator metrics etc.).
- name: Filters based on the metric name.
Specified as a comma-separate list of patterns where *
matches any sequence of characters.
For example, "*Records*,*Bytes*
" matches any metrics where the name contains "Records" or "Bytes"
.
- type: Filters based on the metric type. Specified as a comma-separated list of metric types:
[counter, meter, gauge, histogram]
Examples:- "
*:numRecords*
" Matches metrics like numRecordsIn
. - "
*.job.task.operator:numRecords*
" Matches metrics like numRecordsIn
on the operator level. - "
*.job.task.operator:numRecords*:meter
" Matches meter metrics like numRecordsInPerSecond
on the operator level. - "
*:numRecords*,numBytes*:counter,meter
" Matches all counter/meter metrics like or numRecordsInPerSecond
.
metrics.reporter.<name>.interval
Duration
The reporter interval to use for the reporter named <name>. Only applicable to push-based reporters.
metrics.reporter.<name>.scope.delimiter
String
The delimiter used to assemble the metric identifier for the reporter named <name>.
metrics.reporter.<name>.scope.variables.additional
The map of additional variables that should be included for the reporter named <name>. Only applicable to tag-based reporters.
metrics.reporter.<name>.scope.variables.excludes
String
The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters.
metrics.reporters
(none)
String
An optional list of reporter names. If configured, only reporters whose name matches any of the names in the list will be started. Otherwise, all reporters that could be found in the configuration will be started.
metrics.scope.delimiter
String
Delimiter used to assemble the metric identifier.
metrics.scope.jm
"<host>.jobmanager"
String
Defines the scope format string that is applied to all metrics scoped to a JobManager. Only effective when a identifier-based reporter is configured.
metrics.scope.jm-job
"<host>.jobmanager.<job_name>"
String
Defines the scope format string that is applied to all metrics scoped to a job on a JobManager. Only effective when a identifier-based reporter is configured
metrics.scope.jm-operator
"<host>.jobmanager.<job_name>.<operator_name>"
String
Defines the scope format string that is applied to all metrics scoped to the components running on a JobManager of an Operator, like OperatorCoordinator for Source Enumerator metrics.
metrics.scope.operator
"<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"
String
Defines the scope format string that is applied to all metrics scoped to an operator. Only effective when a identifier-based reporter is configured
metrics.scope.task
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"
String
Defines the scope format string that is applied to all metrics scoped to a task. Only effective when a identifier-based reporter is configured
metrics.scope.tm
"<host>.taskmanager.<tm_id>"
String
Defines the scope format string that is applied to all metrics scoped to a TaskManager. Only effective when a identifier-based reporter is configured
metrics.scope.tm-job
"<host>.taskmanager.<tm_id>.<job_name>"
String
Defines the scope format string that is applied to all metrics scoped to a job on a TaskManager. Only effective when a identifier-based reporter is configured
metrics.system-resource
false
Boolean
Flag indicating whether Flink should report system resource metrics such as machine's CPU, memory or network usage.
metrics.system-resource-probing-interval
Duration
Interval between probing of system resource metrics specified. Has an effect only when 'metrics.system-resource' is enabled.
Flink can report metrics from RocksDB’s native code, for applications using the RocksDB state backend.
The metrics here are scoped to the operators with unsigned longs and have two kinds of types:
RocksDB property-based metrics, which is broken down by column family, e.g. number of currently running compactions of one specific column family.
RocksDB statistics-based metrics, which holds at the database level, e.g. total block cache hit count within the DB.
Enabling RocksDB’s native metrics may cause degraded performance and should be set carefully.
false
Boolean
Monitor the total count of block cache hit in RocksDB (BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + BLOCK_CACHE_FILTER_HIT + BLOCK_CACHE_DATA_HIT).
state.backend.rocksdb.metrics.block-cache-miss
false
Boolean
Monitor the total count of block cache misses in RocksDB (BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + BLOCK_CACHE_FILTER_MISS + BLOCK_CACHE_DATA_MISS).
state.backend.rocksdb.metrics.block-cache-pinned-usage
false
Boolean
Monitor the memory size for the entries being pinned in block cache.
state.backend.rocksdb.metrics.block-cache-usage
false
Boolean
Monitor the memory size for the entries residing in block cache.
state.backend.rocksdb.metrics.bloom-filter-full-positive
false
Boolean
Monitor the total count of reads not avoided by bloom full filter.
state.backend.rocksdb.metrics.bloom-filter-full-true-positive
false
Boolean
Monitor the total count of reads not avoided by bloom full filter and the data actually exists in RocksDB.
state.backend.rocksdb.metrics.bloom-filter-useful
false
Boolean
Monitor the total count of reads avoided by bloom filter.
state.backend.rocksdb.metrics.bytes-read
false
Boolean
Monitor the number of uncompressed bytes read (from memtables/cache/sst) from Get() operation in RocksDB.
state.backend.rocksdb.metrics.bytes-written
false
Boolean
Monitor the number of uncompressed bytes written by DB::{Put(), Delete(), Merge(), Write()} operations, which does not include the compaction written bytes, in RocksDB.
state.backend.rocksdb.metrics.column-family-as-variable
false
Boolean
Whether to expose the column family as a variable for RocksDB property based metrics.
state.backend.rocksdb.metrics.compaction-pending
false
Boolean
Track pending compactions in RocksDB. Returns 1 if a compaction is pending, 0 otherwise.
state.backend.rocksdb.metrics.compaction-read-bytes
false
Boolean
Monitor the bytes read during compaction in RocksDB.
state.backend.rocksdb.metrics.compaction-write-bytes
false
Boolean
Monitor the bytes written during compaction in RocksDB.
state.backend.rocksdb.metrics.cur-size-active-mem-table
false
Boolean
Monitor the approximate size of the active memtable in bytes.
state.backend.rocksdb.metrics.cur-size-all-mem-tables
false
Boolean
Monitor the approximate size of the active and unflushed immutable memtables in bytes.
state.backend.rocksdb.metrics.estimate-live-data-size
false
Boolean
Estimate of the amount of live data in bytes (usually smaller than sst files size due to space amplification).
state.backend.rocksdb.metrics.estimate-num-keys
false
Boolean
Estimate the number of keys in RocksDB.
state.backend.rocksdb.metrics.estimate-pending-compaction-bytes
false
Boolean
Estimated total number of bytes compaction needs to rewrite to get all levels down to under target size. Not valid for other compactions than level-based.
state.backend.rocksdb.metrics.estimate-table-readers-mem
false
Boolean
Estimate the memory used for reading SST tables, excluding memory used in block cache (e.g.,filter and index blocks) in bytes.
state.backend.rocksdb.metrics.is-write-stopped
false
Boolean
Track whether write has been stopped in RocksDB. Returns 1 if write has been stopped, 0 otherwise.
state.backend.rocksdb.metrics.iter-bytes-read
false
Boolean
Monitor the number of uncompressed bytes read (from memtables/cache/sst) from an iterator operation in RocksDB.
state.backend.rocksdb.metrics.live-sst-files-size
false
Boolean
Monitor the total size (bytes) of all SST files belonging to the latest version.WARNING: may slow down online queries if there are too many files.
state.backend.rocksdb.metrics.mem-table-flush-pending
false
Boolean
Monitor the number of pending memtable flushes in RocksDB.
state.backend.rocksdb.metrics.num-deletes-active-mem-table
false
Boolean
Monitor the total number of delete entries in the active memtable.
state.backend.rocksdb.metrics.num-deletes-imm-mem-tables
false
Boolean
Monitor the total number of delete entries in the unflushed immutable memtables.
state.backend.rocksdb.metrics.num-entries-active-mem-table
false
Boolean
Monitor the total number of entries in the active memtable.
state.backend.rocksdb.metrics.num-entries-imm-mem-tables
false
Boolean
Monitor the total number of entries in the unflushed immutable memtables.
state.backend.rocksdb.metrics.num-files-at-level
false
Boolean
Monitor the number of files at each level.
state.backend.rocksdb.metrics.num-immutable-mem-table
false
Boolean
Monitor the number of immutable memtables in RocksDB.
state.backend.rocksdb.metrics.num-live-versions
false
Boolean
Monitor number of live versions. Version is an internal data structure. See RocksDB file version_set.h for details. More live versions often mean more SST files are held from being deleted, by iterators or unfinished compactions.
state.backend.rocksdb.metrics.num-running-compactions
false
Boolean
Monitor the number of currently running compactions.
state.backend.rocksdb.metrics.num-running-flushes
false
Boolean
Monitor the number of currently running flushes.
state.backend.rocksdb.metrics.num-snapshots
false
Boolean
Monitor the number of unreleased snapshots of the database.
state.backend.rocksdb.metrics.size-all-mem-tables
false
Boolean
Monitor the approximate size of the active, unflushed immutable, and pinned immutable memtables in bytes.
state.backend.rocksdb.metrics.stall-micros
false
Boolean
Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB.
state.backend.rocksdb.metrics.total-sst-files-size
false
Boolean
Monitor the total size (bytes) of all SST files of all versions.WARNING: may slow down online queries if there are too many files.
The history server keeps the information of completed jobs (graphs, runtimes, statistics). To enable it, you have to enable “job archiving” in the JobManager (jobmanager.archive.fs.dir
).
See the History Server Docs for details.
Default
Description
false
Boolean
Whether HistoryServer should cleanup jobs that are no longer present `historyserver.archive.fs.dir`.
historyserver.archive.fs.dir
(none)
String
Comma separated list of directories to fetch archived jobs from. The history server will monitor these directories for archived jobs. You can configure the JobManager to archive jobs to a directory via `jobmanager.archive.fs.dir`.
historyserver.archive.fs.refresh-interval
Duration
Interval for refreshing the archived job directories.
historyserver.archive.retained-jobs
Integer
The maximum number of jobs to retain in each archive directory defined by `historyserver.archive.fs.dir`. If set to `-1`(default), there is no limit to the number of archives. If set to `0` or less than `-1` HistoryServer will throw an IllegalConfigurationException
.
historyserver.log.jobmanager.url-pattern
(none)
String
Pattern of the log URL of JobManager. The HistoryServer will generate actual URLs from it, with replacing the special placeholders, `<jobid>`, to the id of job. Only http / https schemes are supported.
historyserver.log.taskmanager.url-pattern
(none)
String
Pattern of the log URL of TaskManager. The HistoryServer will generate actual URLs from it, with replacing the special placeholders, `<jobid>` and `<tmid>`, to the id of job and TaskManager respectively. Only http / https schemes are supported.
historyserver.web.address
(none)
String
Address of the HistoryServer's web interface.
historyserver.web.port
Integer
Port of the HistoryServers's web interface.
historyserver.web.refresh-interval
Duration
The refresh interval for the HistoryServer web-frontend.
historyserver.web.ssl.enabled
false
Boolean
Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the global SSL flag security.ssl.enabled is set to true.
historyserver.web.tmpdir
(none)
String
Local directory that is used by the history server REST API for temporary files.
Duration
Time interval for job client to report its heartbeat when 'execution.attached' and 'execution.shutdown-on-attached-exit' are both true. Cancel the job if timeout configured by 'client.heartbeat.timeout'.
client.heartbeat.timeout
3 min
Duration
Cancel the job if the dispatcher hasn't received the client's heartbeat after timeout when 'execution.attached' and 'execution.shutdown-on-attached-exit' are both true.
client.retry-period
Duration
The interval (in ms) between consecutive retries of failed attempts to execute commands through the CLI or Flink's clients, wherever retry is supported (default 2sec).
client.timeout
1 min
Duration
Timeout on the client side.
Flink is capable to upload and fetch local user artifacts in Application Mode. An artifact can be the actual job archive, a UDF that is packaged separately, etc.
Uploading local artifacts to a DFS is a Kubernetes specific feature, see the Kubernetes section and look for kubernetes.artifacts.*
prefixed options.
Fetching remote artifacts on the deployed application cluster is supported from DFS or an HTTP(S) endpoint.
Note: Artifact Fetching is supported in Standalone Application Mode and Native Kubernetes Application Mode.
(none)
List<String>
A semicolon-separated list of the additional artifacts to fetch for the job before setting up the application cluster. All given elements have to be valid URIs. Example: s3://sandbox-bucket/format.jar;http://sandbox-server:1234/udf.jar
user.artifacts.base-dir
"/opt/flink/artifacts"
String
The base dir to put the application job artifacts.
user.artifacts.http-headers
(none)
Custom HTTP header(s) for the HTTP artifact fetcher. The header(s) will be applied when getting the application job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.
user.artifacts.raw-http-enabled
false
Boolean
Enables artifact fetching from raw HTTP endpoints.
(none)
List<String>
Custom JobListeners to be registered with the execution environment. The registered listeners cannot have constructors with arguments.
execution.job-status-changed-listeners
(none)
List<String>
When job is created or its status is changed, Flink will generate job event and notify job status changed listener.
execution.program-config.enabled
Boolean
Determines whether configurations in the user program are allowed. By default, configuration can be set both on a cluster-level (via options) or within the user program (i.e. programmatic via environment setters). If disabled, all configuration must be defined on a cluster-level and programmatic setters in the user program are prohibited.
Depending on your deployment mode failing the job might have different implications. Either your client that is trying to submit the job to an external cluster (session cluster deployment) throws the exception or the job manager (application mode deployment).
The 'execution.program-config.wildcards' option lists configuration keys that are allowed to be set in user programs regardless of this setting.
execution.program-config.wildcards
List<String>
List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.
Currently changes that are not backed by the Configuration class are always allowed.
execution.shutdown-on-application-finish
Boolean
Whether a Flink Application cluster should shut down automatically after its application finishes (either successfully or as result of a failure). Has no effect for other deployment modes.
execution.shutdown-on-attached-exit
false
Boolean
If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.
execution.submit-failed-job-on-application-error
false
Boolean
If a failed job should be submitted (in the application mode) when there is an error in the application driver before an actual job submission. This is intended for providing a clean way of reporting failures back to the user and is especially useful in combination with 'execution.shutdown-on-application-finish'. This option only works when the single job submission is enforced ('high-availability.type' is enabled). Please note that this is an experimental option and may be changed in the future.
execution.target
(none)
String
The deployment target for the execution. This can take one of the following values when calling bin/flink run
:- remote
- local
- yarn-application
- yarn-per-job (deprecated)
- yarn-session
- kubernetes-application
- kubernetes-session
ALL_EXCHANGES_BLOCKING
Defines how data is exchanged between tasks in batch 'execution.runtime-mode' if the shuffling behavior has not been set explicitly for an individual exchange.
With pipelined exchanges, upstream and downstream tasks run simultaneously. In order to achieve lower latency, a result record is immediately sent to and processed by the downstream task. Thus, the receiver back-pressures the sender. The streaming mode always uses this exchange.
With blocking exchanges, upstream and downstream tasks run in stages. Records are persisted to some storage between stages. Downstream tasks then fetch these records after the upstream tasks finished. Such an exchange reduces the resources required to execute the job as it does not need to run upstream and downstream tasks simultaneously.
With hybrid exchanges (experimental), downstream tasks can run anytime as long as upstream tasks start running. When given sufficient resources, it can reduce the overall job execution time by running tasks simultaneously. Otherwise, it also allows jobs to be executed with very little resources. It adapts to custom preferences between persisting less data and restarting less tasks on failures, by providing different spilling strategies.
Possible values:- "ALL_EXCHANGES_PIPELINED": Upstream and downstream tasks run simultaneously. This leads to lower latency and more evenly distributed (but higher) resource usage across tasks.
- "ALL_EXCHANGES_BLOCKING": Upstream and downstream tasks run subsequently. This reduces the resource usage as downstream tasks are started after upstream tasks finished.
- "ALL_EXCHANGES_HYBRID_FULL": Downstream can start running anytime, as long as the upstream has started. This adapts the resource usage to whatever is available. This type will spill all data to disk to support re-consume.
- "ALL_EXCHANGES_HYBRID_SELECTIVE": Downstream can start running anytime, as long as the upstream has started. This adapts the resource usage to whatever is available. This type will selective spilling data to reduce disk writes as much as possible.
execution.buffer-timeout.enabled
Boolean
If disabled, the config execution.buffer-timeout.interval will not take effect and the flushing will be triggered only when the output buffer is full thus maximizing throughput
execution.buffer-timeout.interval
100 ms
Duration
The maximum time frequency (milliseconds) for the flushing of the output buffers. By default the output buffers flush frequently to provide low latency and to aid smooth developer experience. Setting the parameter can result in three logical modes:- A positive value triggers flushing periodically by that interval
- 0 triggers flushing after every record thus minimizing latency
- If the config execution.buffer-timeout.enabled is false, trigger flushing only when the output buffer is full thus maximizing throughput
execution.checkpointing.snapshot-compression
false
Boolean
Tells if we should use compression for the state snapshot data or not
execution.runtime-mode
STREAMING
Runtime execution mode of DataStream programs. Among other things, this controls task scheduling, network shuffle behavior, and time semantics.
Possible values:- "STREAMING"
- "BATCH"
- "AUTOMATIC"
execution.sort-keyed-partition.memory
128 mb
MemorySize
Sets the managed memory size for sort partition operator on KeyedPartitionWindowedStream.The memory size is only a weight hint. Thus, it will affect the operator's memory weight within a task, but the actual memory used depends on the running environment.
execution.sort-partition.memory
128 mb
MemorySize
Sets the managed memory size for sort partition operator in NonKeyedPartitionWindowedStream.The memory size is only a weight hint. Thus, it will affect the operator's memory weight within a task, but the actual memory used depends on the running environment.
Boolean
When auto-generated UIDs are disabled, users are forced to manually specify UIDs on DataStream applications.
It is highly recommended that users specify UIDs before deploying to production since they are used to match state in savepoints to operators in a job. Because auto-generated ID's are likely to change when modifying a job, specifying custom IDs allow an application to evolve over time without discarding state.
pipeline.auto-watermark-interval
200 ms
Duration
The interval of the automatic watermark emission. Watermarks are used throughout the streaming system to keep track of the progress of time. They are used, for example, for time based windowing.
pipeline.cached-files
(none)
List<String>
Files to be registered at the distributed cache under the given name. The files will be accessible from any user-defined function in the (distributed) runtime under a local path. Files may be local files (which will be distributed via BlobServer), or files in a distributed file system. The runtime will copy the files temporarily to a local cache, if needed.
Example:
name:file1,path:'file:///tmp/file1';name:file2,path:'hdfs:///tmp/file2'
pipeline.classpaths
(none)
List<String>
A semicolon-separated list of the classpaths to package with the job jars to be sent to the cluster. These have to be valid URLs.
pipeline.closure-cleaner-level
RECURSIVE
Configures the mode in which the closure cleaner works.
Possible values:- "NONE": Disables the closure cleaner completely.
- "TOP_LEVEL": Cleans only the top-level class without recursing into fields.
- "RECURSIVE": Cleans all fields recursively.
pipeline.force-avro
false
Boolean
Forces Flink to use the Apache Avro serializer for POJOs.
Important: Make sure to include the flink-avro
module.
pipeline.force-kryo
false
Boolean
If enabled, forces TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. In some cases this might be preferable. For example, when using interfaces with subclasses that cannot be analyzed as POJO.
pipeline.force-kryo-avro
(none)
Boolean
Forces Flink to register avro classes in kryo serializer.
Important: Make sure to include the flink-avro module. Otherwise, nothing will be registered. For backward compatibility, the default value is empty to conform to the behavior of the older version. That is, always register avro with kryo, and if flink-avro is not in the class path, register a dummy serializer. In Flink-2.0, we will set the default value to true.
pipeline.generic-types
Boolean
If the use of generic types is disabled, Flink will throw an UnsupportedOperationException
whenever it encounters a data type that would go through Kryo for serialization.
Disabling generic types can be helpful to eagerly find and eliminate the use of types that would go through Kryo serialization during runtime. Rather than checking types individually, using this option will throw exceptions eagerly in the places where generic types are used.
We recommend to use this option only during development and pre-production phases, not during actual production use. The application program and/or the input data may be such that new, previously unseen, types occur at some point. In that case, setting this option would cause the program to fail.
pipeline.global-job-parameters
(none)
Register a custom, serializable user configuration object. The configuration can be accessed in operators
pipeline.jars
(none)
List<String>
A semicolon-separated list of the jars to package with the job jars to be sent to the cluster. These have to be valid paths.
pipeline.jobvertex-parallelism-overrides
A parallelism override map (jobVertexId -> parallelism) which will be used to update the parallelism of the corresponding job vertices of submitted JobGraphs.
pipeline.max-parallelism
Integer
The program-wide maximum parallelism used for operators which haven't specified a maximum parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state. Changing the value explicitly when recovery from original job will lead to state incompatibility. Must be less than or equal to 32768.
pipeline.name
(none)
String
The job name used for printing and logging.
pipeline.object-reuse
false
Boolean
When enabled objects that Flink internally uses for deserialization and passing data to user-code functions will be reused. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behaviour.
pipeline.operator-chaining.chain-operators-with-different-max-parallelism
Boolean
Operators with different max parallelism can be chained together. Default behavior may prevent rescaling when the AdaptiveScheduler is used.
pipeline.operator-chaining.enabled
Boolean
Operator chaining allows non-shuffle operations to be co-located in the same thread fully avoiding serialization and de-serialization.
pipeline.serialization-config
(none)
List<String>
List of pairs of class names and serializer configs to be used. There is a type
field in the serializer config and each type has its own configuration. Note: only standard YAML config parser is supported, please use "config.yaml" as the config file. The fields involved are:type
: the serializer type which could be "pojo", "kryo" or "typeinfo". If the serializer type is "pojo" or "kryo" without field kryo-type
, it means the data type will use POJO or Kryo serializer directly.kryo-type
: the Kryo serializer type which could be "default" or "registered". The Kryo serializer will use the serializer for the data type as default serializers when the kryo-type is "default", and register the data type and its serializer to Kryo serializer when the kryo-type is registered. When the field exists, there must be a field class
to specify the serializer class name.class
: the serializer class name for type "kryo" or "typeinfo". For "kryo", it should be a subclass of com.esotericsoftware.kryo.Serializer
. For "typeinfo", it should be a subclass of org.apache.flink.api.common.typeinfo.TypeInfoFactory
.
Example:
[org.example.ExampleClass1: {type: pojo}, org.example.ExampleClass2: {type: kryo}, org.example.ExampleClass3: {type: kryo, kryo-type: default, class: org.example.Class3KryoSerializer}, org.example.ExampleClass4: {type: kryo, kryo-type: registered, class: org.example.Class4KryoSerializer}, org.example.ExampleClass5: {type: typeinfo, class: org.example.Class5TypeInfoFactory}]
pipeline.vertex-description-mode
The mode how we organize description of a job vertex.
Possible values:- "TREE"
- "CASCADING"
pipeline.vertex-name-include-index-prefix
false
Boolean
Whether name of vertex includes topological index or not. When it is true, the name will have a prefix of index of the vertex, like '[vertex-0]Source: source'. It is false by default
pipeline.watermark-alignment.allow-unaligned-source-splits
false
Boolean
If watermark alignment is used, sources with multiple splits will attempt to pause/resume split readers to avoid watermark drift of source splits. However, if split readers don't support pause/resume, an UnsupportedOperationException will be thrown when there is an attempt to pause/resume. To allow use of split readers that don't support pause/resume and, hence, to allow unaligned splits while still using watermark alignment, set this parameter to true. The default value is false. Note: This parameter may be removed in future releases.
Duration
Only relevant if execution.checkpointing.unaligned.enabled
is enabled.
If timeout is 0, checkpoints will always start unaligned.
If timeout has a positive value, checkpoints will start aligned. If during checkpointing, checkpoint start delay exceeds this timeout, alignment will timeout and checkpoint barrier will start working as unaligned checkpoint.
execution.checkpointing.checkpoints-after-tasks-finish
Boolean
Feature toggle for enabling checkpointing even if some of tasks have finished. Before you enable it, please take a look at the important considerations
execution.checkpointing.cleaner.parallel-mode
Boolean
Option whether to discard a checkpoint's states in parallel using the ExecutorService passed into the cleaner
execution.checkpointing.create-subdir
Boolean
Whether to create sub-directories named by job id under the 'execution.checkpointing.dir
' to store the data files and meta data of checkpoints. The default value is true to enable user could run several jobs with the same checkpoint directory at the same time. If this value is set to false, pay attention not to run several jobs with the same directory simultaneously.
WARNING: This is an advanced configuration. If set to false, users must ensure that no multiple jobs are run with the same checkpoint directory, and that no files exist other than those necessary for the restoration of the current job when starting a new job.
execution.checkpointing.data-inline-threshold
20 kb
MemorySize
The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.
execution.checkpointing.dir
(none)
String
The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'execution.checkpointing.storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory.
execution.checkpointing.externalized-checkpoint-retention
NO_EXTERNALIZED_CHECKPOINTS
Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status JobStatus#FAILED
or JobStatus#SUSPENDED
). In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.
The mode defines how an externalized checkpoint should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED
).
The target directory for externalized checkpoints is configured via execution.checkpointing.dir
.
Possible values:- "DELETE_ON_CANCELLATION": Checkpoint state is only kept when the owning job fails. It is deleted if the job is cancelled.
- "RETAIN_ON_CANCELLATION": Checkpoint state is kept when the owning job is cancelled or fails.
- "NO_EXTERNALIZED_CHECKPOINTS": Externalized checkpoints are disabled.
execution.checkpointing.file-merging.across-checkpoint-boundary
false
Boolean
Only relevant if execution.checkpointing.file-merging.enabled
is enabled.
Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file.
execution.checkpointing.file-merging.enabled
false
Boolean
Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files. This is an experimental feature under evaluation, make sure you're aware of the possible effects of enabling it.
execution.checkpointing.file-merging.max-file-size
32 mb
MemorySize
Max size of a physical file for merged checkpoints.
execution.checkpointing.file-merging.max-space-amplification
Float
Space amplification stands for the magnification of the occupied space compared to the amount of valid data. The more space amplification is, the more waste of space will be. This configs a space amplification above which a re-uploading for physical files will be triggered to reclaim space. Any value below 1f means disabling the space control.
execution.checkpointing.file-merging.pool-blocking
false
Boolean
Whether to use Blocking or Non-Blocking pool for merging physical files. A Non-Blocking pool will always provide usable physical file without blocking. It may create many physical files if poll file frequently. When poll a small file from a Blocking pool, it may be blocked until the file is returned.
execution.checkpointing.incremental
false
Boolean
Option whether to create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option.
execution.checkpointing.interval
(none)
Duration
Gets the interval in which checkpoints are periodically scheduled.
This setting defines the base interval. Checkpoint triggering may be delayed by the settings execution.checkpointing.max-concurrent-checkpoints
, execution.checkpointing.min-pause
and execution.checkpointing.interval-during-backlog
execution.checkpointing.interval-during-backlog
(none)
Duration
If it is not null and any source reports isProcessingBacklog=true, it is the interval in which checkpoints are periodically scheduled.
Checkpoint triggering may be delayed by the settings execution.checkpointing.max-concurrent-checkpoints
and execution.checkpointing.min-pause
.
Note: if it is not null, the value must either be 0, which means the checkpoint is disabled during backlog, or be larger than or equal to execution.checkpointing.interval.
execution.checkpointing.local-backup.dirs
(none)
String
The config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. If not configured it will default to <WORKING_DIR>/localState. The <WORKING_DIR> can be configured via process.taskmanager.working-dir
execution.checkpointing.local-backup.enabled
false
Boolean
This option configures local backup for the state backend, which indicates whether to make backup checkpoint on local disk. If not configured, fallback to execution.state-recovery.from-local. By default, local backup is deactivated. Local backup currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).
execution.checkpointing.max-concurrent-checkpoints
Integer
The maximum number of checkpoint attempts that may be in progress at the same time. If this value is n, then no checkpoints will be triggered while n checkpoint attempts are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need to finish or expire.
execution.checkpointing.min-pause
Duration
The minimal pause between checkpointing attempts. This setting defines how soon thecheckpoint coordinator may trigger another checkpoint after it becomes possible to triggeranother checkpoint with respect to the maximum number of concurrent checkpoints(see execution.checkpointing.max-concurrent-checkpoints
).
If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure that a minimum amount of time passes where no checkpoint is in progress at all.
execution.checkpointing.mode
EXACTLY_ONCE
The checkpointing mode (exactly-once vs. at-least-once).
Possible values:- "EXACTLY_ONCE"
- "AT_LEAST_ONCE"
execution.checkpointing.num-retained
Integer
The maximum number of completed checkpoints to retain.
execution.checkpointing.savepoint-dir
(none)
String
The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).
execution.checkpointing.storage
(none)
String
The checkpoint storage implementation to be used to checkpoint state.
The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory
. If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)
method is called.
Recognized shortcut names are 'jobmanager' and 'filesystem'.
'execution.checkpointing.storage' and 'execution.checkpointing.dir' are usually combined to configure the checkpoint location. By default, the checkpoint meta data and actual program state will be stored in the JobManager's memory directly. When 'execution.checkpointing.storage' is set to 'jobmanager', if 'execution.checkpointing.dir' is configured, the meta data of checkpoints will be persisted to the path specified by 'execution.checkpointing.dir'. Otherwise, the meta data will be stored in the JobManager's memory. When 'execution.checkpointing.storage' is set to 'filesystem', a valid path must be configured to 'execution.checkpointing.dir', and the checkpoint meta data and actual program state will both be persisted to the path.
execution.checkpointing.timeout
10 min
Duration
The maximum time that a checkpoint may take before being discarded.
execution.checkpointing.tolerable-failed-checkpoints
Integer
The tolerable checkpoint consecutive failure number. If set to 0, that means we do not tolerance any checkpoint failure. This only applies to the following failure reasons: IOException on the Job Manager, failures in the async phase on the Task Managers and checkpoint expiration due to a timeout. Failures originating from the sync phase on the Task Managers are always forcing failover of an affected task. Other types of checkpoint failures (such as checkpoint being subsumed) are being ignored.
execution.checkpointing.unaligned.enabled
false
Boolean
Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.
Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.
Unaligned checkpoints can only be enabled if execution.checkpointing.mode
is EXACTLY_ONCE
and if execution.checkpointing.max-concurrent-checkpoints
is 1
execution.checkpointing.unaligned.forced
false
Boolean
Forces unaligned checkpoints, particularly allowing them for iterative jobs.
execution.checkpointing.unaligned.interruptible-timers.enabled
false
Boolean
Allows unaligned checkpoints to skip timers that are currently being fired. For this feature to be enabled, it must be also supported by the operator. Currently this is supported by all TableStreamOperators and CepOperator.
execution.checkpointing.unaligned.max-subtasks-per-channel-state-file
Integer
Defines the maximum number of subtasks that share the same channel state file. It can reduce the number of small files when enable unaligned checkpoint. Each subtask will create a new channel state file when this is configured to 1.
execution.checkpointing.write-buffer-size
Integer
The default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option 'execution.checkpointing.data-inline-threshold'.
NO_CLAIM
Describes the mode how Flink should restore from the given savepoint or retained checkpoint.
Possible values:- "CLAIM": Flink will take ownership of the given snapshot. It will clean the snapshot once it is subsumed by newer ones.
- "NO_CLAIM": Flink will not claim ownership of the snapshot files. However it will make sure it does not depend on any artefacts from the restored snapshot. In order to do that, Flink will take the first checkpoint as a full one, which means it might reupload/duplicate files that are part of the restored checkpoint.
execution.state-recovery.from-local
false
Boolean
This option configures local recovery for the state backend, which indicates whether to recovery from local snapshot.By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."
execution.state-recovery.ignore-unclaimed-state
false
Boolean
Allow to skip savepoint state that cannot be restored. Allow this if you removed an operator from your pipeline after the savepoint was triggered.
execution.state-recovery.path
(none)
String
Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).
execution.state-recovery.without-channel-state.checkpoint-id
Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.
It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.
Flink dynamically loads the code for jobs submitted to a session cluster. In addition, Flink tries to hide many dependencies in the classpath from the application. This helps to reduce dependency conflicts between the application code and the dependencies in the classpath.
Please refer to the Debugging Classloading Docs for details.
Default
Description
Boolean
Fails attempts at loading classes if the user classloader of a job is used after it has terminated.
This is usually caused by the classloader being leaked by lingering threads or misbehaving libraries, which may also result in the classloader being used by other jobs.
This check should only be disabled if such a leak prevents further jobs from running.
classloader.fail-on-metaspace-oom-error
Boolean
Fail Flink JVM processes if 'OutOfMemoryError: Metaspace' is thrown while trying to load a user code class.
classloader.parent-first-patterns.additional
List<String>
A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to "classloader.parent-first-patterns.default".
classloader.parent-first-patterns.default
"java."; "scala."; "org.apache.flink."; "com.esotericsoftware.kryo"; "org.apache.hadoop."; "javax.annotation."; "org.xml"; "javax.xml"; "org.apache.xerces"; "org.w3c"; "org.rocksdb."; "org.slf4j"; "org.apache.log4j"; "org.apache.logging"; "org.apache.commons.logging"; "ch.qos.logback"
List<String>
A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. To add another pattern we recommend to use "classloader.parent-first-patterns.additional" instead.
classloader.resolve-order
"child-first"
String
Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).
Boolean
Whether to create sub-directories named by job id under the 'execution.checkpointing.dir
' to store the data files and meta data of checkpoints. The default value is true to enable user could run several jobs with the same checkpoint directory at the same time. If this value is set to false, pay attention not to run several jobs with the same directory simultaneously.
WARNING: This is an advanced configuration. If set to false, users must ensure that no multiple jobs are run with the same checkpoint directory, and that no files exist other than those necessary for the restoration of the current job when starting a new job.
execution.checkpointing.data-inline-threshold
20 kb
MemorySize
The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.
execution.checkpointing.write-buffer-size
Integer
The default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option 'execution.checkpointing.data-inline-threshold'.
Integer
Defines the number of measured latencies to maintain at each state access operation.
state.latency-track.keyed-state-enabled
false
Boolean
Whether to track latency of keyed state operations, e.g value state put/get/clear.
state.latency-track.sample-interval
Integer
The sample interval of latency track once 'state.latency-track.keyed-state-enabled' is enabled. The default value is 100, which means we would track the latency every 100 access requests.
state.latency-track.state-name-as-variable
Boolean
Whether to expose state name as a variable if tracking latency.
Integer
The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.If negative, the common (TM) IO thread pool is used (see cluster.io-pool.size)
state.backend.rocksdb.localdir
(none)
String
The local directory (on the TaskManager) where RocksDB puts its files. Per default, it will be <WORKING_DIR>/tmp. See process.taskmanager.working-dir
for more details.
state.backend.rocksdb.options-factory
(none)
String
The options factory class for users to add customized options in DBOptions and ColumnFamilyOptions for RocksDB. If set, the RocksDB state backend will load the class and apply configs to DBOptions and ColumnFamilyOptions after loading ones from 'RocksDBConfigurableOptions' and pre-defined options.
state.backend.rocksdb.predefined-options
"DEFAULT"
String
The predefined settings for RocksDB DBOptions and ColumnFamilyOptions by Flink community. Current supported candidate predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED, SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user customized options and options from the RocksDBOptionsFactory are applied on top of these predefined ones.
false
Boolean
Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.
state.changelog.max-failures-allowed
Integer
Max number of consecutive materialization failures allowed.
state.changelog.periodic-materialize.enabled
Boolean
Defines whether to enable periodic materialization, all changelogs will not be truncated which may increase the space of checkpoint if disabled
state.changelog.periodic-materialize.interval
10 min
Duration
Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.changelog.periodic-materialize.enabled is true
state.changelog.storage
"memory"
String
The storage to be used to store state changelog.
The implementation can be specified via their shortcut name.
The list of recognized shortcut names currently includes 'memory' and 'filesystem'.
10 ms
Duration
Delay before persisting changelog after receiving persist request (on checkpoint). Minimizes the number of files and requests if multiple operators (backends) or sub-tasks are using the same store. Correspondingly increases checkpoint time (async phase).
state.changelog.dstl.dfs.batch.persist-size-threshold
10 mb
MemorySize
Size threshold for state changes that were requested to be persisted but are waiting for state.changelog.dstl.dfs.batch.persist-delay (from all operators). . Once reached, accumulated changes are persisted immediately. This is different from state.changelog.dstl.dfs.preemptive-persist-threshold as it happens AFTER the checkpoint and potentially for state changes of multiple operators. Must not exceed in-flight data limit (see below)
state.changelog.dstl.dfs.compression.enabled
false
Boolean
Whether to enable compression when serializing changelog.
state.changelog.dstl.dfs.discard.num-threads
Integer
Number of threads to use to discard changelog (e.g. pre-emptively uploaded unused state).
state.changelog.dstl.dfs.download.local-cache.idle-timeout-ms
10 min
Duration
Maximum idle time for cache files of distributed changelog file, after which the cache files will be deleted.
state.changelog.dstl.dfs.preemptive-persist-threshold
MemorySize
Size threshold for state changes of a single operator beyond which they are persisted pre-emptively without waiting for a checkpoint. Improves checkpointing time by allowing quasi-continuous uploading of state changes (as opposed to uploading all accumulated changes on checkpoint).
state.changelog.dstl.dfs.upload.buffer-size
MemorySize
Buffer size used when uploading change sets
state.changelog.dstl.dfs.upload.max-attempts
Integer
Maximum number of attempts (including the initial one) to perform a particular upload. Only takes effect if state.changelog.dstl.dfs.upload.retry-policy is fixed.
state.changelog.dstl.dfs.upload.max-in-flight
100 mb
MemorySize
Max amount of data allowed to be in-flight. Upon reaching this limit the task will be back-pressured. I.e., snapshotting will block; normal processing will block if state.changelog.dstl.dfs.preemptive-persist-threshold is set and reached. The limit is applied to the total size of in-flight changes if multiple operators/backends are using the same changelog storage. Must be greater than or equal to state.changelog.dstl.dfs.batch.persist-size-threshold
state.changelog.dstl.dfs.upload.next-attempt-delay
500 ms
Duration
Delay before the next attempt (if the failure was not caused by a timeout).
state.changelog.dstl.dfs.upload.num-threads
Integer
Number of threads to use for upload.
state.changelog.dstl.dfs.upload.retry-policy
"fixed"
String
Retry policy for the failed uploads (in particular, timed out). Valid values: none, fixed.
state.changelog.dstl.dfs.upload.timeout
Duration
Time threshold beyond which an upload is considered timed out. If a new attempt is made but this upload succeeds earlier then this upload result will be used. May improve upload times if tail latencies of upload requests are significantly high. Only takes effect if state.changelog.dstl.dfs.upload.retry-policy is fixed. Please note that timeout * max_attempts should be less than execution.checkpointing.timeout
RocksDB Configurable Options
These options give fine-grained control over the behavior and resources of ColumnFamilies.
With the introduction of state.backend.rocksdb.memory.managed
and state.backend.rocksdb.memory.fixed-per-slot
(Apache Flink 1.10), it should be only necessary to use the options here for advanced performance tuning. These options here can also be specified in the application program via RocksDBStateBackend.setRocksDBOptions(RocksDBOptionsFactory)
.
Default
Description
MemorySize
The approximate size (in bytes) of user data packed per block. The default blocksize is '4KB'.
state.backend.rocksdb.block.cache-size
MemorySize
The amount of the cache for data blocks in RocksDB. The default block-cache size is '8MB'.
state.backend.rocksdb.block.metadata-blocksize
MemorySize
Approximate size of partitioned metadata packed per block. Currently applied to indexes block when partitioned index/filters option is enabled. The default blocksize is '4KB'.
state.backend.rocksdb.bloom-filter.bits-per-key
Double
Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0.
state.backend.rocksdb.bloom-filter.block-based-mode
false
Boolean
If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is 'false'.
state.backend.rocksdb.compaction.filter.periodic-compaction-time
Duration
Periodic compaction could speed up expired state entries cleanup, especially for state entries rarely accessed. Files older than this value will be picked up for compaction, and re-written to the same level as they were before. It makes sure a file goes through compaction filters periodically. 0 means turning off periodic compaction.The default value is '30days'.
state.backend.rocksdb.compaction.filter.query-time-after-num-entries
Number of state entries to process by compaction filter before updating current timestamp. Updating the timestamp more often can improve cleanup speed, but it decreases compaction performance because it uses JNI calls from native code.The default value is '1000L'.
state.backend.rocksdb.compaction.level.max-size-level-base
256 mb
MemorySize
The upper-bound of the total size of level base files in bytes. The default value is '256MB'.
state.backend.rocksdb.compaction.level.target-file-size-base
64 mb
MemorySize
The target file size for compaction, which determines a level-1 file size. The default value is '64MB'.
state.backend.rocksdb.compaction.level.use-dynamic-size
false
Boolean
If true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. The default value is 'false'. For more information, please refer to RocksDB's doc.
state.backend.rocksdb.compaction.style
LEVEL
The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO, UNIVERSAL or NONE, and Flink chooses 'LEVEL' as default style.
Possible values:- "LEVEL"
- "UNIVERSAL"
- "FIFO"
- "NONE"
state.backend.rocksdb.compression.per.level
SNAPPY_COMPRESSION
List<Enum>
A semicolon-separated list of Compression Type. Different levels can have different compression policies. In many cases, lower levels use fast compression algorithms, while higher levels with more data use slower but more effective compression algorithms. The N th element in the List corresponds to the compression type of the level N-1When state.backend.rocksdb.compaction.level.use-dynamic-size
is true, compression_per_level[0] still determines L0, but other elements are based on the base level and may not match the level seen in the info log
Note: If the List size is smaller than the level number, the undefined lower level uses the last Compression Type in the List
Some commonly used compression algorithms for candidates include NO_COMPRESSION
,SNAPPY_COMPRESSION
and LZ4_COMPRESSION
The default value is SNAPPY_COMPRESSION
, which means that all data uses the Snappy compression algorithm.Likewise, if set to NO_COMPRESSION
, means that all data is not compressed, which will achieve faster speed but will bring some space amplification.In addition, if we need to consider both spatial amplification and performance, we can also set it to 'NO_COMPRESSION
;NO_COMPRESSION
;LZ4_COMPRESSION
', which means that L0 and L1 data will not be compressed, and other data will be compressed using LZ4.
Possible values:- "NO_COMPRESSION"
- "SNAPPY_COMPRESSION"
- "ZLIB_COMPRESSION"
- "BZLIB2_COMPRESSION"
- "LZ4_COMPRESSION"
- "LZ4HC_COMPRESSION"
- "XPRESS_COMPRESSION"
- "ZSTD_COMPRESSION"
- "DISABLE_COMPRESSION_OPTION"
state.backend.rocksdb.files.open
Integer
The maximum number of open files (per stateful operator) that can be used by the DB, '-1' means no limit. The default value is '-1'.
state.backend.rocksdb.incremental-restore-async-compact-after-rescale
false
Boolean
If true, an async compaction of RocksDB is started after every restore after which we detect keys (including tombstones) in the database that are outside the key-groups range of the backend.
state.backend.rocksdb.log.dir
(none)
String
The directory for RocksDB's information logging files. If empty (Flink default setting), log files will be in the same directory as the Flink log. If non-empty, this directory will be used and the data directory's absolute path will be used as the prefix of the log file name. If setting this option as a non-existing location, e.g '/dev/null', RocksDB will then create the log under its own database folder as before.
state.backend.rocksdb.log.file-num
Integer
The maximum number of files RocksDB should keep for information logging (Default setting: 4).
state.backend.rocksdb.log.level
INFO_LEVEL
The specified information logging level for RocksDB. If unset, Flink will use INFO_LEVEL
.
Note: RocksDB info logs will not be written to the TaskManager logs and there is no rolling strategy, unless you configure state.backend.rocksdb.log.dir
, state.backend.rocksdb.log.max-file-size
, and state.backend.rocksdb.log.file-num
accordingly. Without a rolling strategy, long-running tasks may lead to uncontrolled disk space usage if configured with increased log levels!
There is no need to modify the RocksDB log level, unless for troubleshooting RocksDB.
Possible values:- "DEBUG_LEVEL"
- "INFO_LEVEL"
- "WARN_LEVEL"
- "ERROR_LEVEL"
- "FATAL_LEVEL"
- "HEADER_LEVEL"
- "NUM_INFO_LOG_LEVELS"
state.backend.rocksdb.log.max-file-size
25 mb
MemorySize
The maximum size of RocksDB's file used for information logging. If the log files becomes larger than this, a new file will be created. If 0, all logs will be written to one log file. The default maximum file size is '25MB'.
state.backend.rocksdb.rescaling.use-delete-files-in-range
false
Boolean
If true, during rescaling, the deleteFilesInRange API will be invoked to clean up the useless files so that local disk space can be reclaimed more promptly.
state.backend.rocksdb.restore-overlap-fraction-threshold
Double
The threshold of overlap fraction between the handle's key-group range and target key-group range. When restore base DB, only the handle which overlap fraction greater than or equal to threshold has a chance to be an initial handle. The default value is 0.0, there is always a handle will be selected for initialization.
state.backend.rocksdb.thread.num
Integer
The maximum number of concurrent background flush and compaction jobs (per stateful operator). The default value is '2'.
state.backend.rocksdb.use-bloom-filter
false
Boolean
If true, every newly created SST file will contain a Bloom filter. It is disabled by default.
state.backend.rocksdb.use-ingest-db-restore-mode
false
Boolean
A recovery mode that directly clips and ingests multiple DBs during state recovery if the keys in the SST files does not exceed the declared key-group range.
state.backend.rocksdb.write-batch-size
MemorySize
The max size of the consumed memory for RocksDB batch write, will flush just based on item count if this config set to 0.
state.backend.rocksdb.writebuffer.count
Integer
The maximum number of write buffers that are built up in memory. The default value is '2'.
state.backend.rocksdb.writebuffer.number-to-merge
Integer
The minimum number of write buffers that will be merged together before writing to storage. The default value is '1'.
state.backend.rocksdb.writebuffer.size
64 mb
MemorySize
The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. The default writebuffer size is '64MB'.
(none)
Integer
The size of the IO executor pool used by the cluster to execute blocking IO operations (Master as well as TaskManager processes). By default it will use 4 * the number of CPU cores (hardware contexts) that the cluster process has access to. Increasing the pool size allows to run more IO operations concurrently.
cluster.registration.error-delay
Duration
The pause made after an registration attempt caused an exception (other than timeout).
cluster.registration.initial-timeout
100 ms
Duration
Initial registration timeout between cluster components.
cluster.registration.max-timeout
Duration
Maximum registration timeout between cluster components.
cluster.registration.refused-registration-delay
Duration
The pause made after the registration attempt was refused.
cluster.services.shutdown-timeout
Duration
The shutdown timeout for cluster services like executors.
heartbeat.interval
Duration
Time interval between heartbeat RPC requests from the sender to the receiver side.
heartbeat.rpc-failure-threshold
Integer
The number of consecutive failed heartbeat RPCs until a heartbeat target is marked as unreachable. Failed heartbeat RPCs can be used to detect dead targets faster because they no longer receive the RPCs. The detection time is heartbeat.interval
* heartbeat.rpc-failure-threshold
. In environments with a flaky network, setting this value too low can produce false positives. In this case, we recommend to increase this value, but not higher than heartbeat.timeout
/ heartbeat.interval
. The mechanism can be disabled by setting this option to -1
heartbeat.timeout
Duration
Timeout for requesting and receiving heartbeats for both sender and receiver sides.
jobmanager.execution.failover-strategy
"region"
String
This option specifies how the job computation recovers from task failures. Accepted values are:- 'full': Restarts all tasks to recover the job.
- 'region': Restarts all tasks that could be affected by the task failure. More details can be found here.
DISABLED
Flag to check user code exiting system by terminating JVM (e.g., System.exit()). Note that this configuration option can interfere with cluster.processes.halt-on-fatal-error
: In intercepted user-code, a call to System.exit() will not cause the JVM to halt, when THROW
is configured.
Possible values:- "DISABLED": Flink is not monitoring or intercepting calls to System.exit()
- "LOG": Log exit attempt with stack trace but still allowing exit to be performed
- "THROW": Throw exception when exit is attempted disallowing JVM termination
cluster.processes.halt-on-fatal-error
false
Boolean
Whether processes should halt on fatal errors instead of performing a graceful shutdown. In some environments (e.g. Java 8 with the G1 garbage collector), a regular graceful shutdown can lead to a JVM deadlock. See FLINK-16510 for details.
cluster.thread-dump.stacktrace-max-depth
Integer
The maximum stacktrace depth of TaskManager and JobManager's thread dump web-frontend displayed.
cluster.uncaught-exception-handling
Defines whether cluster will handle any uncaught exceptions by just logging them (LOG mode), or by failing job (FAIL mode)
Possible values:- "LOG"
- "FAIL"
process.jobmanager.working-dir
(none)
String
Working directory for Flink JobManager processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to process.working-dir
.
process.taskmanager.working-dir
(none)
String
Working directory for Flink TaskManager processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to process.working-dir
.
process.working-dir
io.tmp.dirs
String
Local working directory for Flink processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to a randomly picked temporary directory defined via io.tmp.dirs
.
(none)
Integer
The size of the future thread pool to execute future callbacks for all spawned JobMasters. If no value is specified, then Flink defaults to the number of available CPU cores.
jobmanager.io-pool.size
(none)
Integer
The size of the IO thread pool to run blocking operations for all spawned JobMasters. This includes recovery and completion of checkpoints. Increase this value if you experience slow checkpoint operations when running many jobs. If no value is specified, then Flink defaults to the number of available CPU cores.
16 mb
MemorySize
The average size of data volume to expect each task instance to process if jobmanager.scheduler
has been set to AdaptiveBatch
. Note that when data skew occurs or the decided parallelism reaches the execution.batch.adaptive.auto-parallelism.max-parallelism
(due to too much data), the data actually processed by some tasks may far exceed this value.
execution.batch.adaptive.auto-parallelism.default-source-parallelism
(none)
Integer
The default parallelism of source vertices or the upper bound of source parallelism to set adaptively if jobmanager.scheduler
has been set to AdaptiveBatch
. Note that execution.batch.adaptive.auto-parallelism.max-parallelism
will be used if this configuration is not configured. If execution.batch.adaptive.auto-parallelism.max-parallelism
is not set either, then the default parallelism set via parallelism.default
will be used instead.
execution.batch.adaptive.auto-parallelism.enabled
Boolean
If true, Flink will automatically decide the parallelism of operators in batch jobs.
execution.batch.adaptive.auto-parallelism.max-parallelism
Integer
The upper bound of allowed parallelism to set adaptively if jobmanager.scheduler
has been set to AdaptiveBatch
execution.batch.adaptive.auto-parallelism.min-parallelism
Integer
The lower bound of allowed parallelism to set adaptively if jobmanager.scheduler
has been set to AdaptiveBatch
execution.batch.job-recovery.enabled
false
Boolean
A flag to enable or disable the job recovery. If enabled, batch jobs can resume with previously generated intermediate results after job master restarts due to failures, thereby preserving the progress.
execution.batch.job-recovery.previous-worker.recovery.timeout
Duration
The timeout for a new job master to wait for the previous worker to reconnect.A reconnected worker will transmit the details of its produced intermediate results to the new job master, enabling the job master to reuse these results.
execution.batch.job-recovery.snapshot.min-pause
3 min
Duration
The minimal pause between snapshots taken by operator coordinator or other components. It is used to avoid performance degradation due to excessive snapshot frequency.
execution.batch.speculative.block-slow-node-duration
1 min
Duration
Controls how long an detected slow node should be blocked for.
execution.batch.speculative.enabled
false
Boolean
Controls whether to enable speculative execution.
execution.batch.speculative.max-concurrent-executions
Integer
Controls the maximum number of execution attempts of each operator that can execute concurrently, including the original one and speculative ones.
job-event.store.write-buffer.flush-interval
Duration
The flush interval of JobEventStore write buffers. Buffer contents will be flushed to external file system regularly with regard to this value.
job-event.store.write-buffer.size
MemorySize
The size of the write buffer of JobEventStore. The content will be flushed to external file system once the buffer is full
jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling
Duration
Determines the minimum time between scaling operations.
jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout
1 min
Duration
Defines the duration the JobManager delays the scaling operation after a resource change if only sufficient resources are available. The scaling operation is performed immediately if the resources have changed and the desired resources are available. The timeout begins as soon as either the available resources or the job's resource requirements are changed.
The resource requirements of a running job can be changed using the REST API endpoint.
jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures
Integer
The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint.
jobmanager.adaptive-scheduler.rescale-trigger.max-delay
(none)
Duration
The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures if checkpointing is enabled).
jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout
Duration
The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available during job submission. The timeout starts once sufficient resources for running the job are available. Once this timeout has passed, the job will start executing with the available resources.
If scheduler-mode
is configured to REACTIVE
, this configuration value will default to 0, so that jobs are starting immediately with the available resources.
jobmanager.adaptive-scheduler.submission.resource-wait-timeout
5 min
Duration
The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.
Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).
Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.
If scheduler-mode
is configured to REACTIVE
, this configuration value will default to a negative value to disable the resource timeout.
jobmanager.partition.hybrid.partition-data-consume-constraint
(none)
Controls the constraint that hybrid partition data can be consumed. Note that this option is allowed only when jobmanager.scheduler
has been set to AdaptiveBatch
. Accepted values are:- '
ALL_PRODUCERS_FINISHED
': hybrid partition data can be consumed only when all producers are finished. - '
ONLY_FINISHED_PRODUCERS
': hybrid partition data can be consumed when its producer is finished. - '
UNFINISHED_PRODUCERS
': hybrid partition data can be consumed even if its producer is un-finished.
Possible values:- "ALL_PRODUCERS_FINISHED"
- "ONLY_FINISHED_PRODUCERS"
- "UNFINISHED_PRODUCERS"
jobmanager.scheduler
Default
Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler.
Possible values:- "Default": Default scheduler
- "Adaptive": Adaptive scheduler. More details can be found here.
- "AdaptiveBatch": Adaptive batch scheduler. More details can be found here.
scheduler-mode
(none)
Determines the mode of the scheduler. Note that scheduler-mode
=REACTIVE
is only supported by standalone application deployments, not by active resource managers (YARN, Kubernetes) or session clusters.
Possible values:- "REACTIVE"
slot.idle.timeout
Duration
The timeout for a idle slot in Slot Pool.
slot.request.max-interval
20 ms
Duration
The max interval duration for slots request.
slot.request.timeout
5 min
Duration
The timeout for requesting a slot from Slot Pool.
slotmanager.max-total-resource.cpu
(none)
Double
Maximum cpu cores the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.max'.
slotmanager.max-total-resource.memory
(none)
MemorySize
Maximum memory size the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.max'.
slotmanager.min-total-resource.cpu
(none)
Double
Minimum cpu cores the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.min'.
slotmanager.min-total-resource.memory
(none)
MemorySize
Minimum memory size the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.min'.
slotmanager.number-of-slots.max
infinite
Integer
Defines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.
slotmanager.number-of-slots.min
Integer
Defines the minimum number of slots that the Flink cluster allocates. This configuration option is meant for cluster to initialize certain workers in best efforts when starting. This can be used to speed up a job startup process. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.
slow-task-detector.check-interval
Duration
The interval to check slow tasks.
slow-task-detector.execution-time.baseline-lower-bound
1 min
Duration
The lower bound of slow task detection baseline.
slow-task-detector.execution-time.baseline-multiplier
Double
The multiplier to calculate the slow tasks detection baseline. Given that the parallelism is N and the ratio is R, define T as the median of the first N*R finished tasks' execution time. The baseline will be T*M, where M is the multiplier of the baseline. Note that the execution time will be weighted with the task's input bytes to ensure the accuracy of the detection if data skew occurs.
slow-task-detector.execution-time.baseline-ratio
Double
The finished execution ratio threshold to calculate the slow tasks detection baseline. Given that the parallelism is N and the ratio is R, define T as the median of the first N*R finished tasks' execution time. The baseline will be T*M, where M is the multiplier of the baseline. Note that the execution time will be weighted with the task's input bytes to ensure the accuracy of the detection if data skew occurs.
taskmanager.load-balance.mode
Mode for the load-balance allocation strategy across all available TaskManagers
.- The
SLOTS
mode tries to spread out the slots evenly across all available TaskManagers
. - The
NONE
mode is the default mode without any specified strategy.
Possible values:- "NONE"
- "SLOTS"
String
The port (range) used by the Flink Master for its RPC connections in highly-available setups. In highly-available setups, this value is used instead of 'jobmanager.rpc.port'.A value of '0' means that a random free port is chosen. TaskManagers discover this port through the high-availability services (leader election), so a random port or a port range works without requiring any additional means of service discovery.
"open"
String
Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).
high-availability.zookeeper.client.connection-timeout
Duration
Defines the connection timeout for ZooKeeper.
high-availability.zookeeper.client.ensemble-tracker
Boolean
Defines whether Curator should enable ensemble tracker. This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Default Curator EnsembleTracking logic watches CuratorEventType.GET_CONFIG events and changes ZooKeeper connection string. It is not desired behaviour when ZooKeeper is running under the Virtual IPs. Under certain configurations EnsembleTracking can lead to setting of ZooKeeper connection string with unresolvable hostnames.
high-availability.zookeeper.client.max-retry-attempts
Integer
Defines the number of connection retries before the client gives up.
high-availability.zookeeper.client.retry-wait
Duration
Defines the pause between consecutive retries.
high-availability.zookeeper.client.session-timeout
1 min
Duration
Defines the session timeout for the ZooKeeper session.
high-availability.zookeeper.client.tolerate-suspended-connections
false
Boolean
Defines whether a suspended ZooKeeper connection will be treated as an error that causes the leader information to be invalidated or not. In case you set this option to true
, Flink will wait until a ZooKeeper connection is marked as lost before it revokes the leadership of components. This has the effect that Flink is more resilient against temporary connection instabilities at the cost of running more likely into timing issues with ZooKeeper.
high-availability.zookeeper.path.execution-plans
"/execution-plans"
String
ZooKeeper root path (ZNode) for execution plans
Duration
Define the lease duration for the Kubernetes leader election. The leader will continuously renew its lease time to indicate its existence. And the followers will do a lease checking against the current time. "renewTime + leaseDuration > now" means the leader is alive.
high-availability.kubernetes.leader-election.renew-deadline
Duration
Defines the deadline duration when the leader tries to renew the lease. The leader will give up its leadership if it cannot successfully renew the lease in the given time.
high-availability.kubernetes.leader-election.retry-period
Duration
Defines the pause duration between consecutive retries. All the contenders, including the current leader and all other followers, periodically try to acquire/renew the leadership if possible at this interval.
Integer
The timeout (in ms) for flushing the `close_notify` that was triggered by closing a channel. If the `close_notify` was not flushed in the given timeout the channel will be closed forcibly. (-1 = use system default)
security.ssl.internal.handshake-timeout
Integer
The timeout (in ms) during SSL handshake. (-1 = use system default)
security.ssl.internal.session-cache-size
Integer
The size of the cache used for storing SSL session objects. According to here, you should always set this to an appropriate number to not run into a bug with stalling IO threads during garbage collection. (-1 = use system default).
security.ssl.internal.session-timeout
Integer
The timeout (in ms) for the cached SSL session objects. (-1 = use system default)
security.ssl.provider
"JDK"
String
The SSL engine provider to use for the ssl transport:JDK
: default Java-based SSL engineOPENSSL
: openSSL-based SSL engine using system libraries
OPENSSL
is based on netty-tcnative and comes in two flavours:- dynamically linked: This will use your system's openSSL libraries (if compatible) and requires
opt/flink-shaded-netty-tcnative-dynamic-*.jar
to be copied to lib/
- statically linked: Due to potential licensing issues with openSSL (see LEGAL-393), we cannot ship pre-built libraries. However, you can build the required library yourself and put it into
lib/
:
git clone https://github.com/apache/flink-shaded.git && cd flink-shaded && mvn clean package -Pinclude-netty-tcnative-static -pl flink-shaded-netty-tcnative-static
5 min
Duration
Maximum duration that the result of an async operation is stored. Once elapsed the result of the operation can no longer be retrieved.
rest.await-leader-timeout
Duration
The time that the client waits for the leader address, e.g., Dispatcher or WebMonitorEndpoint
rest.cache.checkpoint-statistics.size
Integer
Maximum number of entries in the checkpoint statistics cache.
rest.cache.checkpoint-statistics.timeout
Duration
Duration from write after which cached checkpoints statistics are cleaned up. For backwards compatibility, if no value is configured, web.refresh-interval
will be used instead.
rest.client.max-content-length
104857600
Integer
The maximum content length in bytes that the client will handle.
rest.connection-timeout
Duration
The maximum time for the client to establish a TCP connection.
rest.flamegraph.cleanup-interval
10 min
Duration
Time after which cached stats are cleaned up if not accessed. It can be specified using notation: "100 s", "10 m".
rest.flamegraph.delay-between-samples
50 ms
Duration
Delay between individual stack trace samples taken for building a FlameGraph. It can be specified using notation: "100 ms", "1 s".
rest.flamegraph.enabled
false
Boolean
Enables the experimental flame graph feature.
rest.flamegraph.num-samples
Integer
Number of samples to take to build a FlameGraph.
rest.flamegraph.refresh-interval
1 min
Duration
Time after which available stats are deprecated and need to be refreshed (by resampling). It can be specified using notation: "30 s", "1 m".
rest.flamegraph.stack-depth
Integer
Maximum depth of stack traces used to create FlameGraphs.
rest.idleness-timeout
5 min
Duration
The maximum time for a connection to stay idle before failing.
rest.profiling.dir
System.getProperty("java.io.tmpdir")
String
Profiling result storing directory.
rest.profiling.duration-max
5 min
Duration
Maximum profiling duration for each profiling request. Any profiling request's duration exceeding this value will not be accepted.
rest.profiling.enabled
false
Boolean
Enables the experimental profiler feature.
rest.profiling.history-size
Integer
Maximum profiling history instance to be maintained for JobManager or each TaskManager. The oldest instance will be removed on a rolling basis when the history size exceeds this value.
rest.retry.delay
Duration
The time that the client waits between retries (See also `rest.retry.max-attempts`).
rest.retry.max-attempts
Integer
The number of retries the client will attempt if a retryable operations fails.
rest.server.max-content-length
104857600
Integer
The maximum content length in bytes that the server will handle.
rest.server.numThreads
Integer
The number of threads for the asynchronous processing of requests.
rest.server.thread-priority
Integer
Thread priority of the REST server's executor for processing asynchronous requests. Lowering the thread priority will give Flink's main components more CPU time whereas increasing will allocate more time for the REST server's processing.
(none)
String
Path to the log file (may be in /log for standalone but under log directory when using YARN).
web.refresh-interval
Duration
Refresh interval for the web-frontend.
web.rescale.enable
Boolean
Flag indicating whether jobs can be rescaled from the web-frontend.
web.submit.enable
Boolean
Flag indicating whether jobs can be uploaded and run from the web-frontend.
web.timeout
10 min
Duration
Timeout for asynchronous operations by the web monitor.
web.tmpdir
System.getProperty("java.io.tmpdir")
String
Local directory that is used by the REST API for temporary files.
web.upload.dir
(none)
String
Local directory that is used by the REST API for storing uploaded jars. If not specified a dynamic directory will be created under web.tmpdir
.
1 min
Duration
Defines the duration the JobManager delays the scaling operation after a resource change if only sufficient resources are available. The scaling operation is performed immediately if the resources have changed and the desired resources are available. The timeout begins as soon as either the available resources or the job's resource requirements are changed.
The resource requirements of a running job can be changed using the REST API endpoint.
jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures
Integer
The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint.
jobmanager.adaptive-scheduler.rescale-trigger.max-delay
(none)
Duration
The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures if checkpointing is enabled).
jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout
Duration
The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available during job submission. The timeout starts once sufficient resources for running the job are available. Once this timeout has passed, the job will start executing with the available resources.
If scheduler-mode
is configured to REACTIVE
, this configuration value will default to 0, so that jobs are starting immediately with the available resources.
jobmanager.adaptive-scheduler.submission.resource-wait-timeout
5 min
Duration
The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.
Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).
Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.
If scheduler-mode
is configured to REACTIVE
, this configuration value will default to a negative value to disable the resource timeout.
jobmanager.archive.fs.dir
(none)
String
Directory for JobManager to store the archives of completed jobs.
jobmanager.bind-host
(none)
String
The local address of the network interface that the job manager binds to. If not configured, '0.0.0.0' will be used.
jobmanager.execution.attempts-history-size
Integer
The maximum number of historical execution attempts kept in history.
jobmanager.execution.failover-strategy
"region"
String
This option specifies how the job computation recovers from task failures. Accepted values are:- 'full': Restarts all tasks to recover the job.
- 'region': Restarts all tasks that could be affected by the task failure. More details can be found here.
jobmanager.failure-enrichers
(none)
String
An optional list of failure enricher names. If empty, NO failure enrichers will be started. If configured, only enrichers whose name matches any of the names in the list will be started.
jobmanager.future-pool.size
(none)
Integer
The size of the future thread pool to execute future callbacks for all spawned JobMasters. If no value is specified, then Flink defaults to the number of available CPU cores.
jobmanager.io-pool.size
(none)
Integer
The size of the IO thread pool to run blocking operations for all spawned JobMasters. This includes recovery and completion of checkpoints. Increase this value if you experience slow checkpoint operations when running many jobs. If no value is specified, then Flink defaults to the number of available CPU cores.
jobmanager.partition.hybrid.partition-data-consume-constraint
(none)
Controls the constraint that hybrid partition data can be consumed. Note that this option is allowed only when jobmanager.scheduler
has been set to AdaptiveBatch
. Accepted values are:- '
ALL_PRODUCERS_FINISHED
': hybrid partition data can be consumed only when all producers are finished. - '
ONLY_FINISHED_PRODUCERS
': hybrid partition data can be consumed when its producer is finished. - '
UNFINISHED_PRODUCERS
': hybrid partition data can be consumed even if its producer is un-finished.
Possible values:- "ALL_PRODUCERS_FINISHED"
- "ONLY_FINISHED_PRODUCERS"
- "UNFINISHED_PRODUCERS"
jobmanager.resource-id
(none)
String
The JobManager's ResourceID. If not configured, the ResourceID will be generated randomly.
jobmanager.retrieve-taskmanager-hostname
Boolean
Flag indicating whether JobManager would retrieve canonical host name of TaskManager during registration. If the option is set to "false", TaskManager registration with JobManager could be faster, since no reverse DNS lookup is performed. However, local input split assignment (such as for HDFS files) may be impacted.
jobmanager.rpc.address
(none)
String
The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobmanager.rpc.bind-port
(none)
Integer
The local RPC port that the JobManager binds to. If not configured, the external port (configured by 'jobmanager.rpc.port') will be used.
jobmanager.rpc.port
Integer
The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobmanager.scheduler
Default
Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler.
Possible values:- "Default": Default scheduler
- "Adaptive": Adaptive scheduler. More details can be found here.
- "AdaptiveBatch": Adaptive batch scheduler. More details can be found here.
jobstore.cache-size
52428800
The job store cache size in bytes which is used to keep completed jobs in memory.
jobstore.expiration-time
The time in seconds after which a completed job expires and is purged from the job store.
jobstore.max-capacity
infinite
Integer
The max number of completed jobs that can be kept in the job store. NOTICE: if memory store keeps too many jobs in session cluster, it may cause FullGC or OOM in jm.
jobstore.type
Determines which job store implementation is used in session cluster. Accepted values are:- 'File': the file job store keeps the archived execution graphs in files
- 'Memory': the memory job store keeps the archived execution graphs in memory. You may need to limit the
jobstore.max-capacity
to mitigate FullGC or OOM when there are too many graphs
Possible values:- "File"
- "Memory"
web.exception-history-size
Integer
The maximum number of failures collected by the exception history per job.
Integer
The config parameter defining the desired backlog of BLOB fetches on the JobManager.Note that the operating system usually enforces an upper limit on the backlog size based on the SOMAXCONN
setting.
blob.fetch.num-concurrent
Integer
The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.
blob.fetch.retries
Integer
The config parameter defining number of retires for failed BLOB fetches.
blob.offload.minsize
1048576
Integer
The minimum size for messages to be offloaded to the BlobServer.
blob.server.port
String
The config parameter defining the server port of the blob service.
blob.service.cleanup.interval
Cleanup interval of the blob caches at the task managers (in seconds).
blob.service.ssl.enabled
Boolean
Flag to override ssl support for the blob service transport.
blob.storage.directory
(none)
String
The config parameter defining the local storage directory to be used by the blob server. If not configured, then it will default to <WORKING_DIR>/blobStorage.
Duration
Timeout for resource manager to recover all the previous attempts workers. If exceeded, resource manager will handle new resource requests by requesting new workers. If you would like to reuse the previous workers as much as possible, you should configure a longer timeout time to wait for previous workers to register.
resourcemanager.rpc.port
Integer
Defines the network port to connect to for communication with the resource manager. By default, the port of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges.
resourcemanager.standalone.start-up-time
(none)
Duration
Time of the start-up period of a standalone cluster. During this time, resource manager of the standalone cluster expects new task executors to be registered, and will not fail slot requests that can not be satisfied by any current registered slots. After this time, it will fail pending and new coming requests immediately that can not be satisfied by registered slots. If not set, slot.request.timeout
will be used by default.
resourcemanager.start-worker.max-failure-rate
Double
The maximum number of start worker failures (Native Kubernetes / Yarn) per minute before pausing requesting new workers. Once the threshold is reached, subsequent worker requests will be postponed to after a configured retry interval ('resourcemanager.start-worker.retry-interval').
resourcemanager.start-worker.retry-interval
Duration
The time to wait before requesting new workers (Native Kubernetes / Yarn) once the max failure rate of starting workers ('resourcemanager.start-worker.max-failure-rate') is reached.
resourcemanager.taskmanager-registration.timeout
5 min
Duration
Timeout for TaskManagers to register at the active resource managers. If exceeded, active resource manager will release and try to re-request the resource for the worker. If not configured, fallback to 'taskmanager.registration.timeout'.
resourcemanager.taskmanager-timeout
Duration
The timeout for an idle task manager to be released.
slotmanager.max-total-resource.cpu
(none)
Double
Maximum cpu cores the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.max'.
slotmanager.max-total-resource.memory
(none)
MemorySize
Maximum memory size the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.max'.
slotmanager.min-total-resource.cpu
(none)
Double
Minimum cpu cores the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.min'.
slotmanager.min-total-resource.memory
(none)
MemorySize
Minimum memory size the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.min'.
slotmanager.number-of-slots.max
infinite
Integer
Defines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.
slotmanager.number-of-slots.min
Integer
Defines the minimum number of slots that the Flink cluster allocates. This configuration option is meant for cluster to initialize certain workers in best efforts when starting. This can be used to speed up a job startup process. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.
slotmanager.redundant-taskmanager-num
Integer
The number of redundant task managers. Redundant task managers are extra task managers started by Flink, in order to speed up job recovery in case of failures due to task manager lost. Note that this feature is available only to the active deployments (native K8s, Yarn).For fine-grained resource requirement, Redundant resources will be reserved, but it is possible that we have many small pieces of free resources form multiple TMs, which added up larger than the desired redundant resources, but each piece is too small to match the resource requirement of tasks from the failed worker.
3 min
Duration
Timeout after which a task cancellation times out and leads to a fatal TaskManager error. A value of 0 deactivates the watch dog. Notice that a task cancellation is different from both a task failure and a clean shutdown. Task cancellation timeout only applies to task cancellation and does not apply to task closing/clean-up caused by a task failure or a clean shutdown.
task.cancellation.timers.timeout
7500 ms
Duration
Time we wait for the timers to finish all pending timer threads when the stream task is cancelled.
taskmanager.bind-host
(none)
String
The local address of the network interface that the task manager binds to. If not configured, '0.0.0.0' will be used.
taskmanager.collect-sink.port
Integer
The port used for the client to retrieve query results from the TaskManager. The default value is 0, which corresponds to a random port assignment.
taskmanager.data.bind-port
(none)
String
The task manager's bind port used for data exchange operations. Also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. If not configured, 'taskmanager.data.port' will be used.
taskmanager.data.port
Integer
The task manager’s external port used for data exchange operations.
taskmanager.data.ssl.enabled
Boolean
Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true
taskmanager.debug.memory.log
false
Boolean
Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
taskmanager.debug.memory.log-interval
Duration
The interval for the log thread to log the current memory usage.
taskmanager.host
(none)
String
The external address of the network interface where the TaskManager is exposed. Because different TaskManagers need different values for this option, usually it is specified in an additional non-shared TaskManager-specific config file.
taskmanager.jvm-exit-on-oom
false
Boolean
Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
taskmanager.load-balance.mode
Mode for the load-balance allocation strategy across all available TaskManagers
.- The
SLOTS
mode tries to spread out the slots evenly across all available TaskManagers
. - The
NONE
mode is the default mode without any specified strategy.
Possible values:- "NONE"
- "SLOTS"
taskmanager.log.path
System.getProperty("log.file")
String
The path to the log file of the task manager.
taskmanager.memory.min-segment-size
256 bytes
MemorySize
Minimum possible size of memory buffers used by the network stack and the memory manager. ex. can be used for automatic buffer size adjustment.
taskmanager.memory.segment-size
32 kb
MemorySize
Size of memory buffers used by the network stack and the memory manager.
taskmanager.network.bind-policy
String
The automatic address binding policy used by the TaskManager if "taskmanager.host" is not set. The value should be one of the following:
- "name" - uses hostname as binding address
- "ip" - uses host's ip address as binding address
taskmanager.numberOfTaskSlots
Integer
The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).
taskmanager.registration.timeout
5 min
Duration
Defines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.
taskmanager.resource-id
(none)
String
The TaskManager's ResourceID. If not configured, the ResourceID will be generated with the "RpcAddress:RpcPort" and a 6-character random string. Notice that this option is not valid in Yarn and Native Kubernetes mode.
taskmanager.rpc.bind-port
(none)
Integer
The local RPC port that the TaskManager binds to. If not configured, the external port (configured by 'taskmanager.rpc.port') will be used.
taskmanager.rpc.port
String
The external RPC port where the TaskManager is exposed. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.
taskmanager.runtime.fs-timeout
Duration
The timeout for filesystem stream opening. A value of 0 indicates infinite waiting.
taskmanager.slot.timeout
Duration
Timeout used for identifying inactive slots. The TaskManager will free the slot if it does not become active within the given amount of time. Inactive slots can be caused by an out-dated slot request. If no value is configured, then it will fall back to pekko.ask.timeout
.
taskmanager.system-out.log.cache-upper-size
100 kb
MemorySize
The cache upper size when Flink caches current line context of System.out
or System.err
when taskmanager.system-out.mode
is LOG.
taskmanager.system-out.log.thread-name.enabled
false
Boolean
Whether to log the thread name when taskmanager.system-out.mode
is LOG.
taskmanager.system-out.mode
DEFAULT
Redirection mode of System.out
and System.err
for all TaskManagers
.DEFAULT
: TaskManagers
don't redirect the System.out
and System.err
, it's the default value.LOG
: TaskManagers
redirect System.out
and System.err
to LOG.info and LOG.error.IGNORE
: TaskManagers
ignore System.out
and System.err
directly.
Possible values:- "DEFAULT"
- "LOG"
- "IGNORE"
The codec to be used when compressing shuffle data. If it is "NONE", compression is disable. If it is not "NONE", only "LZ4", "LZO" and "ZSTD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression ratio is the lowest. "ZSTD" has the highest compression ratio, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed in the future.
Possible values:- "NONE"
- "LZ4"
- "LZO"
- "ZSTD"
taskmanager.network.detailed-metrics
false
Boolean
Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.
taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class
(none)
String
The option configures the class that is responsible for creating an external remote tier factory for hybrid shuffle. If configured, the hybrid shuffle will only initialize the specified remote tier according to the given class name. Currently, since the tier interfaces are not yet public and are still actively evolving, it is recommended that users do not independently implement the external remote tier until the tier interfaces are stabilized.
taskmanager.network.hybrid-shuffle.remote.path
(none)
String
The option is used to configure the base path of remote storage for hybrid shuffle. The shuffle data will be stored in remote storage when the disk space is not enough. Note: If this option is not configured the remote storage will be disabled.
taskmanager.network.memory.buffer-debloat.enabled
false
Boolean
The switch of the automatic buffered debloating feature. If enabled the amount of in-flight data will be adjusted automatically accordingly to the measured throughput.
taskmanager.network.memory.buffer-debloat.period
200 ms
Duration
The minimum period of time after which the buffer size will be debloated if required. The low value provides a fast reaction to the load fluctuation but can influence the performance.
taskmanager.network.memory.buffer-debloat.samples
Integer
The number of the last buffer size values that will be taken for the correct calculation of the new one.
taskmanager.network.memory.buffer-debloat.target
Duration
The target total time after which buffered in-flight data should be fully consumed. This configuration option will be used, in combination with the measured throughput, to adjust the amount of in-flight data.
taskmanager.network.memory.buffer-debloat.threshold-percentages
Integer
The minimum difference in percentage between the newly calculated buffer size and the old one to announce the new value. Can be used to avoid constant back and forth small adjustments.
taskmanager.network.memory.read-buffer.required-per-gate.max
(none)
Integer
The maximum number of network read buffers that are required by an input gate. (An input gate is responsible for reading data from all subtasks of an upstream task.) The number of buffers needed by an input gate is dynamically calculated in runtime, depending on various factors (e.g., the parallelism of the upstream task). Among the calculated number of needed buffers, the part below this configured value is required, while the excess part, if any, is optional. A task will fail if the required buffers cannot be obtained in runtime. A task will not fail due to not obtaining optional buffers, but may suffer a performance reduction. If not explicitly configured, the default value is Integer.MAX_VALUE for streaming workloads, and 1000 for batch workloads. If explicitly configured, the configured value should be at least 1.
taskmanager.network.netty.client.connectTimeoutSec
Integer
The Netty client connection timeout.
taskmanager.network.netty.client.tcp.keepCount
(none)
Integer
The maximum number of keepalive probes TCP should send before Netty client dropping the connection. Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, refer to https://bugs.openjdk.org/browse/JDK-8194298.
taskmanager.network.netty.client.tcp.keepIdleSec
(none)
Integer
The time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes. Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, refer to https://bugs.openjdk.org/browse/JDK-8194298.
taskmanager.network.netty.client.tcp.keepIntervalSec
(none)
Integer
The time (in seconds) between individual keepalive probes. Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, refer to https://bugs.openjdk.org/browse/JDK-8194298.
taskmanager.network.partition-request-timeout
Duration
Timeout for an individual partition request of remote input channels. The partition request will finally fail if the total wait time exceeds twice the value of taskmanager.network.request-backoff.max
.
taskmanager.network.request-backoff.initial
Integer
Minimum backoff in milliseconds for partition requests of local input channels.
taskmanager.network.request-backoff.max
10000
Integer
Maximum backoff in milliseconds for partition requests of local input channels.
taskmanager.network.retries
Integer
The number of retry attempts for network communication. Currently it's only used for establishing input/output channel connections
taskmanager.network.sort-shuffle.min-buffers
Integer
Minimum number of network buffers required per blocking result partition for sort-shuffle. For production usage, it is suggested to increase this config value to at least 2048 (64M memory if the default 32K memory segment size is used) to improve the data compression ratio and reduce the small network packets. Usually, several hundreds of megabytes memory is enough for large scale batch jobs. Note: you may also need to increase the size of total network memory to avoid the 'insufficient number of network buffers' error if you are increasing this config value.
taskmanager.network.tcp-connection.enable-reuse-across-jobs
Boolean
Whether to reuse tcp connections across multi jobs. If set to true, tcp connections will not be released after job finishes. The subsequent jobs will be free from the overhead of the connection re-establish. However, this may lead to an increase in the total number of connections on your machine. When it reaches the upper limit, you can set it to false to release idle connections.
Boolean
If true, call stack for asynchronous asks are captured. That way, when an ask fails (for example times out), you get a proper exception, describing to the original method call and call site. Note that in case of having millions of concurrent RPC calls, this may add to the memory footprint.
pekko.ask.timeout
Duration
Timeout used for all futures and blocking Pekko calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d).
pekko.client-socket-worker-pool.pool-size-factor
Double
The pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values.
pekko.client-socket-worker-pool.pool-size-max
Integer
Max number of threads to cap factor-based number to.
pekko.client-socket-worker-pool.pool-size-min
Integer
Min number of threads to cap factor-based number to.
pekko.fork-join-executor.parallelism-factor
Double
The parallelism factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the parallelism-min and parallelism-max values.
pekko.fork-join-executor.parallelism-max
Integer
Max number of threads to cap factor-based parallelism number to.
pekko.fork-join-executor.parallelism-min
Integer
Min number of threads to cap factor-based parallelism number to.
pekko.framesize
"10485760b"
String
Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier.
pekko.jvm-exit-on-fatal-error
Boolean
Exit JVM on fatal Pekko errors.
pekko.log.lifecycle.events
false
Boolean
Turns on the Pekko’s remote logging of events. Set this value to 'true' in case of debugging.
pekko.lookup.timeout
Duration
Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit specifier (ms/s/min/h/d).
pekko.remote-fork-join-executor.parallelism-factor
Double
The parallelism factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the parallelism-min and parallelism-max values.
pekko.remote-fork-join-executor.parallelism-max
Integer
Max number of threads to cap factor-based parallelism number to.
pekko.remote-fork-join-executor.parallelism-min
Integer
Min number of threads to cap factor-based parallelism number to.
pekko.retry-gate-closed-for
Milliseconds a gate should be closed for after a remote connection was disconnected.
pekko.server-socket-worker-pool.pool-size-factor
Double
The pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values.
pekko.server-socket-worker-pool.pool-size-max
Integer
Max number of threads to cap factor-based number to.
pekko.server-socket-worker-pool.pool-size-min
Integer
Min number of threads to cap factor-based number to.
pekko.ssl.enabled
Boolean
Turns on SSL for Pekko’s remote communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true.
pekko.startup-timeout
(none)
Duration
Timeout after which the startup of a remote component is considered being failed.
pekko.tcp.timeout
Duration
Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value.
pekko.throughput
Integer
Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness.
(none)
String
Path to hadoop configuration directory. It is required to read HDFS and/or YARN configuration. You can also set it via environment variable.
env.hbase.conf.dir
(none)
String
Path to hbase configuration directory. It is required to read HBASE configuration. You can also set it via environment variable.
env.java.default-opts.all
(none)
String
A string of default JVM options to prepend to env.java.opts.all
. This is intended to be set by administrators.
env.java.default-opts.jobmanager
(none)
String
A string of default JVM options to prepend to env.java.opts.jobmanager
. This is intended to be set by administrators.
env.java.default-opts.taskmanager
(none)
String
A string of default JVM options to prepend to env.java.opts.taskmanager
. This is intended to be set by administrators.
env.java.opts.all
(none)
String
Java options to start the JVM of all Flink processes with.
env.java.opts.client
(none)
String
Java options to start the JVM of the Flink Client with.
env.java.opts.historyserver
(none)
String
Java options to start the JVM of the HistoryServer with.
env.java.opts.jobmanager
(none)
String
Java options to start the JVM of the JobManager with.
env.java.opts.sql-gateway
(none)
String
Java options to start the JVM of the Flink SQL Gateway with.
env.java.opts.taskmanager
(none)
String
Java options to start the JVM of the TaskManager with.
env.log.dir
(none)
String
Defines the directory where the Flink logs are saved. It has to be an absolute path. (Defaults to the log directory under Flink’s home)
env.log.level
"INFO"
String
Defines the level of the root logger.
env.log.max
Integer
The maximum number of old log files to keep.
env.pid.dir
"/tmp"
String
Defines the directory where the flink-<host>-<process>.pid files are saved.
env.ssh.opts
(none)
String
Additional command line options passed to SSH clients when starting or stopping JobManager, TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh).
env.stdout-err.redirect-to-file
false
Boolean
Whether redirect stdout and stderr to files when running foreground. If enabled, logs won't append the console too. Note that redirected files do not support rolling rotate.
env.yarn.conf.dir
(none)
String
Path to yarn configuration directory. It is required to run flink on YARN. You can also set it via environment variable.
You can configure environment variables to be set on the JobManager and TaskManager processes started on Yarn.
containerized.master.env.
: Prefix for passing custom environment variables to Flink’s JobManager process.
For example for passing LD_LIBRARY_PATH as an env variable to the JobManager, set containerized.master.env.LD_LIBRARY_PATH: “/usr/lib/native”
in the flink-conf.yaml.
containerized.taskmanager.env.
: Similar to the above, this configuration prefix allows setting custom environment variables for the workers (TaskManagers).
These options relate to parts of Flink that are not actively developed any more.
These options may be removed in a future release.
DataSet API Optimizer
Default
Description
Integer
The maximum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters.
compiler.delimited-informat.max-sample-len
2097152
Integer
The maximal length of a line sample that the compiler takes for delimited inputs. If the length of a single sample exceeds this value (possible because of misconfiguration of the parser), the sampling aborts. This value can be overridden for a specific input with the input format’s parameters.
compiler.delimited-informat.min-line-samples
Integer
The minimum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters
false
Boolean
Flag to activate/deactivate bloom filters in the hybrid hash join implementation. In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of memory), these bloom filters can greatly reduce the number of spilled records, at the cost some CPU cycles.
taskmanager.runtime.large-record-handler
false
Boolean
Whether to use the LargeRecordHandler when spilling. If a record will not fit into the sorting buffer. The record will be spilled on disk and the sorting will continue with only the key. The record itself will be read afterwards when merging.
taskmanager.runtime.max-fan
Integer
The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small.
taskmanager.runtime.sort-spilling-threshold
Float
A sort operation starts spilling when this fraction of its memory budget is full.
false
Boolean
File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to "true", writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to "false", the writer will directly create the file directly at the output path, without creating a containing directory.
fs.overwrite-files
false
Boolean
Specifies whether file output writers should overwrite existing files by default. Set to "true" to overwrite by default,"false" otherwise.
Advanced Options for the debugging
Advanced Checkpointing Options
State Latency Tracking Options
Advanced RocksDB State Backends Options
State Changelog Options
Advanced Fault Tolerance Options
Advanced Cluster Options
Advanced JobManager Options
Advanced Scheduling Options
Advanced High-availability Options
Advanced High-availability ZooKeeper Options
Advanced High-availability Kubernetes Options
Advanced SSL Security Options
Advanced Options for the REST endpoint and Client
Advanced Options for Flink Web UI
Full JobManager Options
Full TaskManagerOptions
RPC / Pekko
焦虑的书签 · 如何在不显示窗口的情况下运行PowerShell脚本?_如何在没有终端窗口的情况下运行PowerShell脚本?_在不运行脚本的情况下运行powershell - 腾讯云开发者社区 - 腾讯云 1 月前 |
聪明的莲藕 · 无颜之月在线,再来一次好吗动漫免费-影院快讯-策驰资讯 8 月前 |