kubernetes.operator.savepoint.history.max.age: 24 h
kubernetes.operator.savepoint.history.max.count: 5
Savepoint cleanup happens lazily and only when the application is running.
It is therefore very likely that savepoints live beyond the max age configuration.
To disable savepoint cleanup by the operator you can set kubernetes.operator.savepoint.cleanup.enabled: false
.
When savepoint cleanup is disabled the operator will still collect and populate the savepoint history but not perform any dispose operations.
Recovery of missing job deployments
When HA is enabled, the operator can recover the Flink cluster deployments in cases when it was accidentally deleted
by the user or some external process. Deployment recovery can be turned off in the configuration by setting kubernetes.operator.jm-deployment-recovery.enabled
to false
, however it is recommended to keep this setting on the default true
value.
This is not something that would usually happen during normal operation and can also indicate a deeper problem,
therefore an Error event is also triggered by the system when it detects a missing deployment.
One scenario which could lead to a loss of jobmanager deployment in Flink versions before 1.15 is the job entering a terminal state:
Fatal job error
Job Finished
Loss of operator process after triggering savepoint shutdown but before recording the status
In Flink version before 1.15 a terminal job state leads to a deployment shutdown, therefore it’s impossible for the operator to know what happened.
In these cases no recovery will be performed to avoid dataloss and an error will be thrown.
Please check the manual Recovery section to understand how to recover from these situations.
Restart of unhealthy job deployments
When HA is enabled, the operator can restart the Flink cluster deployments in cases when it was considered
unhealthy. Unhealthy deployment restart can be turned on in the configuration by setting kubernetes.operator.cluster.health-check.enabled
to true
(default: false
).
In order this feature to work one must enable recovery of missing job deployments.
At the moment deployment is considered unhealthy when:
Flink’s restarts count reaches kubernetes.operator.cluster.health-check.restarts.threshold
(default: 64
)
within time window of kubernetes.operator.cluster.health-check.restarts.window
(default: 2 minutes).
cluster.health-check.checkpoint-progress.enabled
is turned on and Flink’s successful checkpoints count is not
changing within time window of within time window of kubernetes.operator.cluster.health-check.checkpoint-progress.window
(default: 5 minutes).
Restart failed job deployments
The operator can restart a failed Flink job. This could be useful in cases when the job main task is
able to reconfigure the job to handle these failures.
When kubernetes.operator.job.restart.failed
is set to true
then at the moment when the job status is
set to FAILED
the kubernetes operator will delete the current job and redeploy the job using the
latest successful checkpoint.
Application upgrade rollbacks (Experimental)
The operator supports upgrade rollbacks as an experimental feature.
The rollback feature works based on the concept of stable deployments specs.
When an application is upgraded, the new spec is initially considered unstable. Once the operator successfully observes the new job in a healthy running state, the spec is marked as stable.
If a new upgrade is not marked stable within a certain configurable time period (kubernetes.operator.deployment.readiness.timeout
) then a rollback operation will be performed, rolling back to the last stable spec.
To enable rollbacks you need to set:
kubernetes.operator.deployment.rollback.enabled: true
HA is currently required for the rollback functionality.
Applications are never rolled back to a previous running state if they were suspended before the upgrade.
In these cases no rollback will be performed.
Rollbacks exclusively affect the FlinkDeployment
/FlinkSession
CRDs.
When releasing a new version of your Flink application that updates several Kubernetes resources (such as config maps and services), you must ensure backward compatibility. This means that your updates to non-FlinkDeployment/FlinkSession resources must be compatible with both old and new FlinkDeployment/FlinkSession resources.
Stability condition
Currently, a new job is marked stable as soon as the operator could observe it running. This allows us to detect obvious errors, but it’s not always enough to detect more complex issues.
In the future we expect to introduce more sophisticated conditions.
Rollback is currently only supported for FlinkDeployments
.
Manual Recovery
There are cases when manual intervention is required from the user to recover a Flink application deployment or to restore to a user specified state.
In most of these situations the main reason for this is that the deployment got into a state where the operator cannot determine the health of the application or the latest checkpoint information to be used for recovery.
While these cases are not common, we need to be prepared to handle them.
Users have two options to restore a job from a target savepoint / checkpoint
Redeploy using the savepointRedeployNonce
It is possible to redeploy a FlinkDeployment
or FlinkSessionJob
resource from a target savepoint by using the combination of savepointRedeployNonce
and initialSavepointPath
in the job spec:
job:
initialSavepointPath: file://redeploy-target-savepoint
# If not set previously, set to 1, otherwise increment, e.g. 2
savepointRedeployNonce: 1
When changing the savepointRedeployNonce
the operator will redeploy the job to the savepoint defined in the initialSavepointPath
. The savepoint path must not be empty.
Rollbacks are not supported after redeployments.
Delete and recreate the custom resource
Alternatively you can completely delete and recreate the custom resources to solve almost any issues. This will fully reset the status information to start from a clean slate.
However, this also means that savepoint history is lost and the operator won’t clean up past periodic savepoints taken before the deletion.
Locate the latest checkpoint/savepoint metafile in your configured checkpoint/savepoint directory.
Delete the FlinkDeployment
resource for your application
Check that you have the current savepoint, and that your FlinkDeployment
is deleted completely
Modify your FlinkDeployment
JobSpec and set the initialSavepointPath
to your last checkpoint location
Recreate the deployment
These steps ensure that the operator will start completely fresh from the user defined savepoint path and can hopefully fully recover.
Keep an eye on your job to see what could have cause the problem in the first place.
Recovery of missing job deployments
Restart of unhealthy job deployments
Restart failed job deployments
Application upgrade rollbacks (Experimental)
Stability condition
Manual Recovery
Redeploy using the savepointRedeployNonce
Delete and recreate the custom resource