Skip to content

Commit

Permalink
feat: add default timeout for Client.get_job() (#1935)
Browse files Browse the repository at this point in the history
* feat: add default timeout for Client.get_job()

* change timeout type detection

* lint

* fix unit test and coverage

* add type hint

* fix type hint

* change import style and add comments

* remove sentinel value in client

* type hint

* typo

* add sentinel for query_and_wait()

* add unit tests

* fix unit test

* Update google/cloud/bigquery/job/query.py

Co-authored-by: Tim Sweña (Swast) <[email protected]>

* Update google/cloud/bigquery/job/query.py

Co-authored-by: Tim Sweña (Swast) <[email protected]>

* address comments

* typo

* type hint

* typos

---------

Co-authored-by: Tim Sweña (Swast) <[email protected]>
  • Loading branch information
Linchin and tswast committed May 31, 2024
1 parent 94d61b0 commit 9fbad76
Show file tree
Hide file tree
Showing 14 changed files with 421 additions and 209 deletions.
2 changes: 2 additions & 0 deletions google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from google.auth import credentials as ga_credentials # type: ignore
from google.api_core import client_options as client_options_lib

TimeoutType = Union[float, None]

_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
_TIMEONLY_WO_MICROS = "%H:%M:%S"
_TIMEONLY_W_MICROS = "%H:%M:%S.%f"
Expand Down
13 changes: 8 additions & 5 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@
import functools
import os
import uuid
from typing import Any, Dict, TYPE_CHECKING, Optional
from typing import Any, Dict, Optional, TYPE_CHECKING, Union

import google.api_core.exceptions as core_exceptions
from google.api_core import retry as retries

from google.cloud.bigquery import job
import google.cloud.bigquery.query
from google.cloud.bigquery import table
from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE

# Avoid circular imports
if TYPE_CHECKING: # pragma: NO COVER
Expand Down Expand Up @@ -328,7 +329,7 @@ def query_and_wait(
location: Optional[str],
project: str,
api_timeout: Optional[float] = None,
wait_timeout: Optional[float] = None,
wait_timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
retry: Optional[retries.Retry],
job_retry: Optional[retries.Retry],
page_size: Optional[int] = None,
Expand Down Expand Up @@ -364,10 +365,12 @@ def query_and_wait(
api_timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
wait_timeout (Optional[float]):
wait_timeout (Optional[Union[float, object]]):
The number of seconds to wait for the query to finish. If the
query doesn't finish before this timeout, the client attempts
to cancel the query.
to cancel the query. If unset, the underlying Client.get_job() API
call has timeout, but we still wait indefinitely for the job to
finish.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC. This only applies to making RPC
calls. It isn't used to retry failed jobs. This has
Expand Down Expand Up @@ -545,7 +548,7 @@ def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool:
def _wait_or_cancel(
job: job.QueryJob,
api_timeout: Optional[float],
wait_timeout: Optional[float],
wait_timeout: Optional[Union[object, float]],
retry: Optional[retries.Retry],
page_size: Optional[int],
max_results: Optional[int],
Expand Down
5 changes: 3 additions & 2 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from google.cloud.bigquery._helpers import _DEFAULT_UNIVERSE
from google.cloud.bigquery._helpers import _validate_universe
from google.cloud.bigquery._helpers import _get_client_universe
from google.cloud.bigquery._helpers import TimeoutType
from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
Expand All @@ -107,6 +108,7 @@
DEFAULT_JOB_RETRY,
DEFAULT_RETRY,
DEFAULT_TIMEOUT,
DEFAULT_GET_JOB_TIMEOUT,
)
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
Expand All @@ -123,7 +125,6 @@
_versions_helpers.PANDAS_VERSIONS.try_import()
) # mypy check fails because pandas import is outside module, there are type: ignore comments related to this

TimeoutType = Union[float, None]
ResumableTimeoutType = Union[
None, float, Tuple[float, float]
] # for resumable media methods
Expand Down Expand Up @@ -2139,7 +2140,7 @@ def get_job(
project: Optional[str] = None,
location: Optional[str] = None,
retry: retries.Retry = DEFAULT_RETRY,
timeout: TimeoutType = DEFAULT_TIMEOUT,
timeout: TimeoutType = DEFAULT_GET_JOB_TIMEOUT,
) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
"""Fetch a job for the project associated with this client.
Expand Down
29 changes: 12 additions & 17 deletions google/cloud/bigquery/job/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import google.api_core.future.polling

from google.cloud.bigquery import _helpers
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery._helpers import _int_or_none
from google.cloud.bigquery.retry import (
DEFAULT_GET_JOB_TIMEOUT,
DEFAULT_RETRY,
)


_DONE_STATE = "DONE"
Expand Down Expand Up @@ -801,7 +804,7 @@ def reload(
self,
client=None,
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT,
):
"""API call: refresh job properties via a GET request.
Expand All @@ -820,22 +823,14 @@ def reload(
"""
client = self._require_client(client)

extra_params = {}
if self.location:
extra_params["location"] = self.location
span_attributes = {"path": self.path}

api_response = client._call_api(
retry,
span_name="BigQuery.job.reload",
span_attributes=span_attributes,
job_ref=self,
method="GET",
path=self.path,
query_params=extra_params,
got_job = client.get_job(
self,
project=self.project,
location=self.location,
retry=retry,
timeout=timeout,
)
self._set_properties(api_response)
self._set_properties(got_job._properties)

def cancel(
self,
Expand Down Expand Up @@ -913,7 +908,7 @@ def _set_future_result(self):
def done(
self,
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT,
reload: bool = True,
) -> bool:
"""Checks if the job is complete.
Expand Down
34 changes: 24 additions & 10 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
StructQueryParameter,
UDFResource,
)
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.retry import (
DEFAULT_RETRY,
DEFAULT_JOB_RETRY,
POLLING_DEFAULT_VALUE,
)
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _EmptyRowIterator
Expand Down Expand Up @@ -1437,7 +1441,7 @@ def result( # type: ignore # (incompatible with supertype)
page_size: Optional[int] = None,
max_results: Optional[int] = None,
retry: Optional[retries.Retry] = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
start_index: Optional[int] = None,
job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
) -> Union["RowIterator", _EmptyRowIterator]:
Expand All @@ -1457,11 +1461,14 @@ def result( # type: ignore # (incompatible with supertype)
is ``DONE``, retrying is aborted early even if the
results are not available, as this will not change
anymore.
timeout (Optional[float]):
timeout (Optional[Union[float, \
google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
]]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
before using ``retry``. If ``None``, wait indefinitely
unless an error is returned. If unset, only the
underlying API calls have their default timeouts, but we still
wait indefinitely for the job to finish.
start_index (Optional[int]):
The zero-based index of the starting row to read.
job_retry (Optional[google.api_core.retry.Retry]):
Expand Down Expand Up @@ -1507,6 +1514,13 @@ def result( # type: ignore # (incompatible with supertype)
# Intentionally omit job_id and query_id since this doesn't
# actually correspond to a finished query job.
)

# When timeout has default sentinel value ``object()``, do not pass
# anything to invoke default timeouts in subsequent calls.
kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
if type(timeout) is not object:
kwargs["timeout"] = timeout

try:
retry_do_query = getattr(self, "_retry_do_query", None)
if retry_do_query is not None:
Expand Down Expand Up @@ -1548,7 +1562,7 @@ def is_job_done():
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, timeout=timeout):
if self.done(retry=retry, **kwargs):
# If it's already failed, we might as well stop.
job_failed_exception = self.exception()
if job_failed_exception is not None:
Expand Down Expand Up @@ -1585,14 +1599,14 @@ def is_job_done():
# response from the REST API. This ensures we aren't
# making any extra API calls if the previous loop
# iteration fetched the finished job.
self._reload_query_results(retry=retry, timeout=timeout)
self._reload_query_results(retry=retry, **kwargs)
return True

# Call jobs.getQueryResults with max results set to 0 just to
# wait for the query to finish. Unlike most methods,
# jobs.getQueryResults hangs as long as it can to ensure we
# know when the query has finished as soon as possible.
self._reload_query_results(retry=retry, timeout=timeout)
self._reload_query_results(retry=retry, **kwargs)

# Even if the query is finished now according to
# jobs.getQueryResults, we'll want to reload the job status if
Expand Down Expand Up @@ -1682,10 +1696,10 @@ def is_job_done():
max_results=max_results,
start_index=start_index,
retry=retry,
timeout=timeout,
query_id=self.query_id,
first_page_response=first_page_response,
num_dml_affected_rows=self._query_results.num_dml_affected_rows,
**kwargs,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
Expand Down
11 changes: 11 additions & 0 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from google.api_core import exceptions
from google.api_core import retry
import google.api_core.future.polling
from google.auth import exceptions as auth_exceptions # type: ignore
import requests.exceptions

Expand Down Expand Up @@ -140,3 +141,13 @@ def _job_should_retry(exc):
"""
The default job retry object.
"""

DEFAULT_GET_JOB_TIMEOUT = 128
"""
Default timeout for Client.get_job().
"""

POLLING_DEFAULT_VALUE = google.api_core.future.polling.PollingFuture._DEFAULT_VALUE

This comment has been minimized.

Copy link
@sasha-gitg

sasha-gitg Jul 10, 2024

Member

It looks like this attribute is only available in python-api-core >= 2.11: googleapis/python-api-core@434253d

But the library doesn't seem to bound that dependency: https://github.com/googleapis/python-bigquery/blob/main/setup.py#L35

This may be causing the issue reported here: googleapis/python-aiplatform#4079

"""
Default value defined in google.api_core.future.polling.PollingFuture.
"""
Loading

0 comments on commit 9fbad76

Please sign in to comment.