添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement . We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What happened

I ran a long-running SeaTunnel task, but any time it passed the timeout node, it threw an error.

I looked around and found that the following configuration generates 3 pipelineSubtasks. But CheckpointCoordinator only goes to triggerBarrier task in startingTask every time。The remaining tasks will wait until the timeout location。
Is this because it's not done yet?

In  CheckpointCoordinator.class  
    private Set<Long> getNotYetAcknowledgedTasks() {
        // TODO: some tasks have completed and don't need to be ack
        return plan.getPipelineSubtasks().stream()
                .map(TaskLocation::getTaskID)
                .collect(Collectors.toCollection(CopyOnWriteArraySet::new));
    public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier checkpointBarrier) {
        // TODO: some tasks have completed and don't need to trigger
        return plan.getStartingSubtasks().stream()
                .map(
                        taskLocation ->
                                new CheckpointBarrierTriggerOperation(
                                        checkpointBarrier, taskLocation))
                .map(checkpointManager::sendOperationToMemberNode)
                .toArray(InvocationFuture[]::new);

SeaTunnel Version

2.3.3

SeaTunnel Config

"env": { "job.mode": "BATCH" "source": [ "result_table_name": "jdbc_temp_table", "url": "jdbc:mysql://****:3306/test_src", "driver": "com.mysql.cj.jdbc.Driver", "user": "***", "password": "***", "query": "SELECT id, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15 FROM test_data_5", "plugin_name": "JDBC" "sink": [ "source_table": "jdbc_temp_table", "url": "jdbc:mysql://***:3306/test_dst", "driver": "com.mysql.cj.jdbc.Driver", "user": "***", "password": "***", "query": "INSERT INTO test_data_5(id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", "plugin_name": "JDBC"

Running Command

./bin/seatunnel.sh --config conf.json

Error Exception

2023-09-26 10:29:52,676 WARN  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job my2my.json (758877844708261889), Pipeline: [(1/1)] checkpoint have error, cancel the pipeline
2023-09-26 10:29:52,676 WARN  org.apache.seatunnel.engine.server.dag.physical.SubPlan - start cancel job Job my2my.json (758877844708261889), Pipeline: [(1/1)] count = 0
2023-09-26 10:29:52,677 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=1}] state in Imap is RUNNING, not equals expected state CREATED
2023-09-26 10:29:52,677 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=1}] state in Imap is RUNNING, not equals expected state SCHEDULED
2023-09-26 10:29:52,677 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=1}] state in Imap is RUNNING, not equals expected state DEPLOYING
2023-09-26 10:29:52,677 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=30000}] state in Imap is RUNNING, not equals expected state CREATED
2023-09-26 10:29:52,678 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=30000}] state in Imap is RUNNING, not equals expected state SCHEDULED
2023-09-26 10:29:52,678 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=30000}] state in Imap is RUNNING, not equals expected state DEPLOYING
2023-09-26 10:29:52,678 ERROR org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job my2my.json (758877844708261889), Pipeline: [(1/1)] cancel error with exception: java.lang.InterruptedException
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipelineTasks(SubPlan.java:461)
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipeline(SubPlan.java:417)
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.handleCheckpointError(SubPlan.java:659)
	at org.apache.seatunnel.engine.server.master.JobMaster.lambda$handleCheckpointError$2(JobMaster.java:345)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at org.apache.seatunnel.engine.server.master.JobMaster.handleCheckpointError(JobMaster.java:342)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.handleCheckpointError(CheckpointManager.java:180)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:268)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:539)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2023-09-26 10:29:52,681 WARN  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-718890] [5.1] Interrupted task 20000 - org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask@6f8ceee0
2023-09-26 10:29:52,682 ERROR org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation - [localhost]:5801 [seatunnel-718890] [5.1] null
java.lang.NullPointerException: null
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:689) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:270) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81) ~[classes/:?]
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) [hazelcast-5.1.jar:5.1]
2023-09-26 10:29:52,575 ERROR org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - trigger checkpoint failed
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml or jobConfig env.
	at org.apache.seatunnel.engine.server.checkpoint.PendingCheckpoint.abortCheckpoint(PendingCheckpoint.java:176) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$cleanPendingCheckpoint$19(CheckpointCoordinator.java:652) ~[classes/:?]
	at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4707) ~[?:1.8.0_202]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.cleanPendingCheckpoint(CheckpointCoordinator.java:650) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:263) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:539) ~[classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_202]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_202]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_202]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
2023-09-26 10:29:52,681 WARN  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-718890] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@5d235d6b
java.lang.RuntimeException: java.lang.InterruptedException
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:41) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:46) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:28) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:206) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:123) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader.pollNext(JdbcSourceReader.java:64) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:105) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:167) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:110) ~[classes/:?]
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:611) [classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_202]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_202]
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) ~[?:1.8.0_202]
	at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353) ~[?:1.8.0_202]
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:39) ~[classes/:?]
	... 15 more
2023-09-26 10:29:52,686 ERROR org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - report error from task
org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.RuntimeException: java.util.concurrent.CompletionException: java.lang.NullPointerException
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:41)
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:46)
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:28)
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:206)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.triggerBarrier(SourceFlowLifeCycle.java:268)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.triggerBarrier(SourceSeaTunnelTask.java:122)
	at org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation.lambda$null$0(BarrierFlowOperation.java:90)
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
	at com.hazelcast.spi.impl.AbstractInvocationFuture.returnOrThrowWithJoinConventions(AbstractInvocationFuture.java:819)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.resolveAndThrowWithJoinConvention(AbstractInvocationFuture.java:835)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.join(AbstractInvocationFuture.java:553)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.ack(SeaTunnelTask.java:347)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:66)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:39)
	... 10 more
Caused by: java.lang.NullPointerException
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:689)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:270)
	at org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81)
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:346) [classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:188) [classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.run(CheckpointErrorReportOperation.java:48) [classes/:?]
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) [hazelcast-5.1.jar:5.1]
2023-09-26 10:29:52,685 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-718890] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=30000}
2023-09-26 10:29:53,124 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:54,115 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:54,691 WARN  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job my2my.json (758877844708261889), Pipeline: [(1/1)] cancel error will retry
java.lang.InterruptedException: null
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347) ~[?:1.8.0_202]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_202]
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipelineTasks(SubPlan.java:461) ~[classes/:?]
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipeline(SubPlan.java:417) ~[classes/:?]
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.handleCheckpointError(SubPlan.java:659) ~[classes/:?]
	at org.apache.seatunnel.engine.server.master.JobMaster.lambda$handleCheckpointError$2(JobMaster.java:345) ~[classes/:?]
	at java.util.ArrayList.forEach(ArrayList.java:1257) ~[?:1.8.0_202]
	at org.apache.seatunnel.engine.server.master.JobMaster.handleCheckpointError(JobMaster.java:342) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.handleCheckpointError(CheckpointManager.java:180) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:268) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:539) ~[classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_202]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_202]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_202]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
2023-09-26 10:29:54,691 WARN  org.apache.seatunnel.engine.server.dag.physical.SubPlan - start cancel job Job my2my.json (758877844708261889), Pipeline: [(1/1)] count = 1
2023-09-26 10:29:55,099 DEBUG org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - start send heartbeat to resource manager, this address: [localhost]:5801
2023-09-26 10:29:55,099 DEBUG org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager - received worker heartbeat from: [localhost]:5801
2023-09-26 10:29:55,115 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:56,117 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:57,113 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:58,124 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:59,116 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:59,854 DEBUG org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-718890] [5.1] 

Zeta or Flink or Spark Version

2.3.3

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
  • Code of Conduct

  • I agree to follow this project's Code of Conduct
  • As I looked at the code below, this will generate three taskLocations with ids 200000, 400000, and 50000.
    But the taskLocations with ids 40000 and 50000 never ack properly, so I found PendingCheckpoing with not isFullyAcknowledged all the time.

    PendingCheckpoint.java
    line 145        if (isFullyAcknowledged()) {
    line 146           LOG.debug("checkpoint is full ack!");
    line 147          completableFuture.complete(toCompletedCheckpoint());
    

    This then causes the checkpoint to never complete and eventually times out

    When using JDBC to read a very large table,I found that JDBCSourceReader added a lock when reading the data, but the lock could not be retrieved from triggerBarrier in SourceFlowLifeCycle, so this task could not ack successfully.

    Even if I didn't need a checkpoint, I would eventually get an error if I exceeded the checkpoint timeout.

    Do I have to either set the timeout to the maximum value of the integer, or set the interval so large that the checkpoint fails? Is there a configuration that makes it unnecessary to checkpoint, because this kind of data itself doesn't need to be checkpointed

    When using JDBC to read a very large table,I found that JDBCSourceReader added a lock when reading the data, but the lock could not be retrieved from triggerBarrier in SourceFlowLifeCycle, so this task could not ack successfully.

    Even if I didn't need a checkpoint, I would eventually get an error if I exceeded the checkpoint timeout.

    Do I have to either set the timeout to the maximum value of the integer, or set the interval so large that the checkpoint fails? Is there a configuration that makes it unnecessary to checkpoint, because this kind of data itself doesn't need to be checkpointed

    Can you set the timeout time? refer to https://seatunnel.apache.org/docs/2.3.3/seatunnel-engine/deployment#43-checkpoint-manager

    Refer to #5694. Try to use the latest code.

    I use the latest code and recompile, it still occur timeout error

    You can try to set the 'partition_column' and 'partition_num' parameters to split your table, and then set the checkpoint interval based on the processing speed of each partition.

    If it is a batch task and the checkpoint timeout is not set in the task information, theoretically the checkpoint should not be triggered. Recently, I encountered this problem when running a large batch of data. The problem is not the size of the timeout setting, but whether the checkpoint should be triggered
    @liugddx

    If it is a batch task and the checkpoint timeout is not set in the task information, theoretically the checkpoint should not be triggered. Recently, I encountered this problem when running a large batch of data. The problem is not the size of the timeout setting, but whether the checkpoint should be triggered @liugddx

    2.3.4 Support closing checkpoint