添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

I don’t think this is a Prefect bug , per se, but I am struggling to narrow down why this is happening and could use some guidance on how to troubleshoot this – and perhaps general suggestions for why I might be doing something wrong.

Our Setup

  • We are using Prefect v2.11.4 (upgraded to latest to rule that out)
  • For the server, we have this installed in a custom Docker image based on rocky linux 9.2 (x86_64) and have also tried the vanilla prefecthq/prefect:2.11.4-python3.11 server with same results.
  • We have a dedicated postgresql container (a postgres 15 based on rocky linux) for Prefect’s persistence.
  • We are using a single agent (based on same Docker image) – have not yet changed to using “workers”.
  • We have setup concurrency limit for the tasks.
  • We are using Docker Swarm for all services (with local volumes, as this is a single node swarm).
  • Issue Summary

    When attempting to run tasks concurrently using task.submit() pattern, the server get a TimeoutError attempting to create a task run in the database which then yields a 500 error to the agent and results in crashing the job. This doesn’t happen 100% of the time, but it happens most of the time.

  • Originally we were using DaskTaskRunner . It seemed at first like just using the ConcurrentTaskRunner was a solution, but it only succeeded once and then started failing in much the same way.
  • Code Snippets

    This flow is for processing network scan results. The files themselves are large JSONL files (compressed each file is around 7GB).

    I don’t yet have a simple reproduce case, but the general flow & task invocation looks like this:

    @task(tags=["process-file", "minio-read"]) def process_file(object_path: str, tmp_dir_name: Path, verbose: bool = False) -> int: # Note: original design had read_and_split_file and process_batch tasks being called by the flow, # but in this simplified version, we're just calling those as functions -- hence the .fn() batch_files = read_and_split_file.fn(object_path=object_path, tmp_dir_name=tmp_dir_name, verbose=verbose) banner_count = 0 logger.info(f"Created {len(batch_files)} for {object_path} source file.") for batch_file in batch_files: banner_count += process_batch.fn(batch_file=batch_file) except Exception as x: logger.exception(f"Error processing batch {batch_file} of {object_path}") continue return banner_count @flow(log_prints=True, task_runner=ConcurrentTaskRunner()) def banner_pipeline(file_glob: str = "*.json*", verbose: bool = False): Import a jsonl file (possibly compressed) from minio and ingest. :param file_glob: The glob pattern to match files in minio. :param verbose: Whether to emit verbose logs (could be significant). setup_environment() object_paths = all_matching_object_paths(file_glob=file_glob, verbose=verbose) banner_count = 0 # All tasks in a flow get executed by same agent, so we can rely on this tmpdir being available. with tempfile.TemporaryDirectory() as tmp_dir_name: if settings.CONCURRENT_PIPELINES: logger.info(f"Processing input files concurrently.") process_file_futures = list[PrefectFuture]() for object_path in object_paths: process_file_futures.append( process_file.submit(object_path=object_path, tmp_dir_name=tmp_dir_name, verbose=verbose) for fut in process_file_futures: banner_count += fut.result() else: logger.info(f"Processing matched files sequentially.") for object_path in object_paths: banner_count += process_file(object_path=object_path, tmp_dir_name=tmp_dir_name, verbose=verbose) except Exception as x: logger.exception(f"Error processing file: {object_path}") continue logger.info(f"Processed {banner_count} banners from {len(object_paths)} source files.") return banner_count

    Note that this works fine if settings.CONCURRENT_PIPELINES is False (but is too slow to actually be a viable processing solution). For reference, each batch of 10k rows is taking right now ~20s to ingest.

    Details and Stack Traces

    On the server the task fails with this pattern:

    Exception in ASGI application
    Traceback (most recent call last):
      File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py", line 408, in run_asgi
        result = await app(  # type: ignore[func-returns-value]
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
        return await self.app(scope, receive, send)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 289, in __call__
        await super().__call__(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
        await self.middleware_stack(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
        raise exc
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
        await self.app(scope, receive, _send)
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 83, in __call__
        await self.app(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
        raise exc
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
        await self.app(scope, receive, sender)
      File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
        raise e
      File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
        await self.app(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
        await route.handle(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 443, in handle
        await self.app(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 289, in __call__
        await super().__call__(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
        await self.middleware_stack(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
        raise exc
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
        await self.app(scope, receive, _send)
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/gzip.py", line 24, in __call__
        await responder(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/gzip.py", line 44, in __call__
        await self.app(scope, receive, self.send_with_gzip)
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
        raise exc
      File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
        await self.app(scope, receive, sender)
      File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
        raise e
      File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
        await self.app(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
        await route.handle(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 276, in handle
        await self.app(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 69, in app
        await response(scope, receive, send)
      File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 174, in __call__
        await self.background()
      File "/usr/local/lib/python3.11/site-packages/starlette/background.py", line 43, in __call__
        await task()
      File "/usr/local/lib/python3.11/site-packages/starlette/background.py", line 26, in __call__
        await self.func(*self.args, **self.kwargs)
      File "/usr/local/lib/python3.11/site-packages/prefect/server/api/work_queues.py", line 165, in _record_work_queue_polls
        await models.work_queues.update_work_queue(
      File "/usr/local/lib/python3.11/site-packages/prefect/server/database/dependencies.py", line 119, in async_wrapper
        return await fn(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/prefect/server/models/work_queues.py", line 217, in update_work_queue
        result = await session.execute(update_stmt)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 454, in execute
        result = await greenlet_spawn(
                 ^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 190, in greenlet_spawn
        result = context.throw(*sys.exc_info())
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2262, in execute
        return self._execute_internal(
               ^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2144, in _execute_internal
        result: Result[Any] = compile_state_cls.orm_execute_statement(
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/bulk_persistence.py", line 1620, in orm_execute_statement
        return super().orm_execute_statement(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
        result = conn.execute(
                 ^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1412, in execute
        return meth(
               ^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
        return connection._execute_clauseelement(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1635, in _execute_clauseelement
        ret = self._execute_context(
              ^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1844, in _execute_context
        return self._exec_single_context(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1984, in _exec_single_context
        self._handle_dbapi_exception(
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2342, in _handle_dbapi_exception
        raise exc_info[1].with_traceback(exc_info[2])
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1965, in _exec_single_context
        self.dialect.do_execute(
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 921, in do_execute
        cursor.execute(statement, parameters)
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 585, in execute
        self._adapt_connection.await_(
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 125, in await_only
        return current.driver.switch(awaitable)  # type: ignore[no-any-return]
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 185, in greenlet_spawn
        value = await result
                ^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 564, in _prepare_and_execute
        self._handle_exception(error)
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 515, in _handle_exception
        self._adapt_connection._handle_exception(error)
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 804, in _handle_exception
        raise error
      File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 552, in _prepare_and_execute
        self._rows = await prepared_stmt.fetch(*parameters)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
        data = await self.__bind_execute(args, 0, timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
        data, status, _ = await self.__do_execute(
                          ^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
        return await executor(protocol)
               ^^^^^^^^^^^^^^^^^^^^^^^^
      File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
    TimeoutError
    

    Sometimes the server stack trace also points to a method related to checking / updating concurrency limit.

    On the agent this looks like:

    Crash details:
    Traceback (most recent call last):
      File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1839, in report_flow_run_crashes
        yield
      File "/usr/lib64/python3.11/contextlib.py", line 716, in __aexit__
        cb_suppress = await cb(*exc_details)
                      ^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/app-root/lib64/python3.11/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
        raise exceptions[0]
      File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1303, in create_task_run_then_submit
        task_run = await create_task_run(
                   ^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1348, in create_task_run
        task_run = await flow_run_context.client.create_task_run(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/orchestration.py", line 1872, in create_task_run
        response = await self._client.post(
                   ^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/app-root/lib64/python3.11/site-packages/httpx/_client.py", line 1848, in post
        return await self.request(
               ^^^^^^^^^^^^^^^^^^^
      File "/opt/app-root/lib64/python3.11/site-packages/httpx/_client.py", line 1530, in request
        return await self.send(request, auth=auth, follow_redirects=follow_redirects)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/base.py", line 280, in send
        response.raise_for_status()
      File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/base.py", line 138, in raise_for_status
        raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
    prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url 'http://prefect-server:4200/api/task_runs/'
    Response: {'exception_message': 'Internal Server Error'}
    For more information check: https://httpstatuses.com/500
    

    Any pointers for how to troubleshoot or mitigate these server timeouts would be very appreciated!

    UPDATE work_queue SET last_polled=$1::TIMESTAMP WITH TIME ZONE, updated=CURRENT_TIMESTAMP WHERE work_queue.id = $2::UUID RowExclusiveLock relation active 2023-08-22 18:22:38.288 -0400 00:02:38.810187 UPDATE work_queue SET last_polled=$1::TIMESTAMP WITH TIME ZONE, updated=CURRENT_TIMESTAMP WHERE work_queue.id = $2::UUID RowExclusiveLock relation active 2023-08-22 18:22:38.288 -0400 00:02:38.810187 UPDATE work_queue SET last_polled=$1::TIMESTAMP WITH TIME ZONE, updated=CURRENT_TIMESTAMP WHERE work_queue.id = $2::UUID ExclusiveLock transactionid active 2023-08-22 18:22:38.288 -0400 00:02:38.810187

    (And there are a number more.)

    It may be worth mentioning that the start of the flows on the agent are CPU-intensive gunzip operations that last for several minutes. Perhaps because these do not “yield”, they are effectively blocking server-agent comms (?) – That would seem odd, though, as these are not async flows/tasks and I would expect that Prefect already knows how to run these in the background.