diff --git a/google/cloud/bigquery/_job_helpers.py b/google/cloud/bigquery/_job_helpers.py index 7992f28b6..095de4faa 100644 --- a/google/cloud/bigquery/_job_helpers.py +++ b/google/cloud/bigquery/_job_helpers.py @@ -12,9 +12,32 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Helpers for interacting with the job REST APIs from the client.""" +"""Helpers for interacting with the job REST APIs from the client. + +For queries, there are three cases to consider: + +1. jobs.insert: This always returns a job resource. +2. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED: + This sometimes can return the results inline, but always includes a job ID. +3. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL: + This sometimes doesn't create a job at all, instead returning the results. + For better debugging, an auto-generated query ID is included in the + response. + +Client.query() calls either (1) or (2), depending on what the user provides +for the api_method parameter. query() always returns a QueryJob object, which +can retry the query when the query job fails for a retriable reason. + +Client.query_and_wait() calls (3). This returns a RowIterator that may wrap +local results from the response or may wrap a query job containing multiple +pages of results. Even though query_and_wait() waits for the job to complete, +we still need a separate job_retry object because there are different +predicates where it is safe to generate a new query ID. +""" import copy +import functools +import os import uuid from typing import Any, Dict, TYPE_CHECKING, Optional @@ -23,6 +46,7 @@ from google.cloud.bigquery import job import google.cloud.bigquery.query +from google.cloud.bigquery import table # Avoid circular imports if TYPE_CHECKING: # pragma: NO COVER @@ -59,6 +83,25 @@ def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> s return str(uuid.uuid4()) +def job_config_with_defaults( + job_config: Optional[job.QueryJobConfig], + default_job_config: Optional[job.QueryJobConfig], +) -> Optional[job.QueryJobConfig]: + """Create a copy of `job_config`, replacing unset values with those from + `default_job_config`. + """ + if job_config is None: + return default_job_config + + if default_job_config is None: + return job_config + + # Both job_config and default_job_config are not None, so make a copy of + # job_config merged with default_job_config. Anything already explicitly + # set on job_config should not be replaced. + return job_config._fill_from_default(default_job_config) + + def query_jobs_insert( client: "Client", query: str, @@ -67,9 +110,9 @@ def query_jobs_insert( job_id_prefix: Optional[str], location: Optional[str], project: str, - retry: retries.Retry, + retry: Optional[retries.Retry], timeout: Optional[float], - job_retry: retries.Retry, + job_retry: Optional[retries.Retry], ) -> job.QueryJob: """Initiate a query using jobs.insert. @@ -123,7 +166,13 @@ def do_query(): return future -def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]: +def _to_query_request( + job_config: Optional[job.QueryJobConfig] = None, + *, + query: str, + location: Optional[str] = None, + timeout: Optional[float] = None, +) -> Dict[str, Any]: """Transform from Job resource to QueryRequest resource. Most of the keys in job.configuration.query are in common with @@ -150,6 +199,15 @@ def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any request_body.setdefault("formatOptions", {}) request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore + if timeout is not None: + # Subtract a buffer for context switching, network latency, etc. + request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS) + + if location is not None: + request_body["location"] = location + + request_body["query"] = query + return request_body @@ -207,6 +265,10 @@ def _to_query_job( return query_job +def _to_query_path(project: str) -> str: + return f"/projects/{project}/queries" + + def query_jobs_query( client: "Client", query: str, @@ -217,18 +279,14 @@ def query_jobs_query( timeout: Optional[float], job_retry: retries.Retry, ) -> job.QueryJob: - """Initiate a query using jobs.query. + """Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED. See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query """ - path = f"/projects/{project}/queries" - request_body = _to_query_request(job_config) - - if timeout is not None: - # Subtract a buffer for context switching, network latency, etc. - request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS) - request_body["location"] = location - request_body["query"] = query + path = _to_query_path(project) + request_body = _to_query_request( + query=query, job_config=job_config, location=location, timeout=timeout + ) def do_query(): request_body["requestId"] = make_job_id() @@ -253,3 +311,235 @@ def do_query(): future._job_retry = job_retry return future + + +def query_and_wait( + client: "Client", + query: str, + *, + job_config: Optional[job.QueryJobConfig], + location: Optional[str], + project: str, + api_timeout: Optional[float] = None, + wait_timeout: Optional[float] = None, + retry: Optional[retries.Retry], + job_retry: Optional[retries.Retry], + page_size: Optional[int] = None, + max_results: Optional[int] = None, +) -> table.RowIterator: + """Run the query, wait for it to finish, and return the results. + + While ``jobCreationMode=JOB_CREATION_OPTIONAL`` is in preview in the + ``jobs.query`` REST API, use the default ``jobCreationMode`` unless + the environment variable ``QUERY_PREVIEW_ENABLED=true``. After + ``jobCreationMode`` is GA, this method will always use + ``jobCreationMode=JOB_CREATION_OPTIONAL``. See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query + + Args: + client: + BigQuery client to make API calls. + query (str): + SQL query to be executed. Defaults to the standard SQL + dialect. Use the ``job_config`` parameter to change dialects. + job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): + Extra configuration options for the job. + To override any options that were previously set in + the ``default_query_job_config`` given to the + ``Client`` constructor, manually set those options to ``None``, + or whatever value is preferred. + location (Optional[str]): + Location where to run the job. Must match the location of the + table used in the query as well as the destination table. + project (Optional[str]): + Project ID of the project of where to run the job. Defaults + to the client's project. + api_timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + wait_timeout (Optional[float]): + The number of seconds to wait for the query to finish. If the + query doesn't finish before this timeout, the client attempts + to cancel the query. + retry (Optional[google.api_core.retry.Retry]): + How to retry the RPC. This only applies to making RPC + calls. It isn't used to retry failed jobs. This has + a reasonable default that should only be overridden + with care. + job_retry (Optional[google.api_core.retry.Retry]): + How to retry failed jobs. The default retries + rate-limit-exceeded errors. Passing ``None`` disables + job retry. Not all jobs can be retried. + page_size (Optional[int]): + The maximum number of rows in each page of results from this + request. Non-positive values are ignored. + max_results (Optional[int]): + The maximum total number of rows from this request. + + Returns: + google.cloud.bigquery.table.RowIterator: + Iterator of row data + :class:`~google.cloud.bigquery.table.Row`-s. During each + page, the iterator will have the ``total_rows`` attribute + set, which counts the total number of rows **in the result + set** (this is distinct from the total number of rows in the + current page: ``iterator.page.num_items``). + + If the query is a special query that produces no results, e.g. + a DDL query, an ``_EmptyRowIterator`` instance is returned. + + Raises: + TypeError: + If ``job_config`` is not an instance of + :class:`~google.cloud.bigquery.job.QueryJobConfig` + class. + """ + # Some API parameters aren't supported by the jobs.query API. In these + # cases, fallback to a jobs.insert call. + if not _supported_by_jobs_query(job_config): + return _wait_or_cancel( + query_jobs_insert( + client=client, + query=query, + job_id=None, + job_id_prefix=None, + job_config=job_config, + location=location, + project=project, + retry=retry, + timeout=api_timeout, + job_retry=job_retry, + ), + api_timeout=api_timeout, + wait_timeout=wait_timeout, + retry=retry, + page_size=page_size, + max_results=max_results, + ) + + path = _to_query_path(project) + request_body = _to_query_request( + query=query, job_config=job_config, location=location, timeout=api_timeout + ) + + if page_size is not None and max_results is not None: + request_body["maxResults"] = min(page_size, max_results) + elif page_size is not None or max_results is not None: + request_body["maxResults"] = page_size or max_results + + if os.getenv("QUERY_PREVIEW_ENABLED", "").casefold() == "true": + request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL" + + def do_query(): + request_body["requestId"] = make_job_id() + span_attributes = {"path": path} + + # For easier testing, handle the retries ourselves. + if retry is not None: + response = retry(client._call_api)( + retry=None, # We're calling the retry decorator ourselves. + span_name="BigQuery.query", + span_attributes=span_attributes, + method="POST", + path=path, + data=request_body, + timeout=api_timeout, + ) + else: + response = client._call_api( + retry=None, + span_name="BigQuery.query", + span_attributes=span_attributes, + method="POST", + path=path, + data=request_body, + timeout=api_timeout, + ) + + # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages + # to fetch, there will be a job ID for jobs.getQueryResults. + query_results = google.cloud.bigquery.query._QueryResults.from_api_repr( + response + ) + page_token = query_results.page_token + more_pages = page_token is not None + + if more_pages or not query_results.complete: + # TODO(swast): Avoid a call to jobs.get in some cases (few + # remaining pages) by waiting for the query to finish and calling + # client._list_rows_from_query_results directly. Need to update + # RowIterator to fetch destination table via the job ID if needed. + return _wait_or_cancel( + _to_query_job(client, query, job_config, response), + api_timeout=api_timeout, + wait_timeout=wait_timeout, + retry=retry, + page_size=page_size, + max_results=max_results, + ) + + return table.RowIterator( + client=client, + api_request=functools.partial(client._call_api, retry, timeout=api_timeout), + path=None, + schema=query_results.schema, + max_results=max_results, + page_size=page_size, + total_rows=query_results.total_rows, + first_page_response=response, + location=query_results.location, + job_id=query_results.job_id, + query_id=query_results.query_id, + project=query_results.project, + ) + + if job_retry is not None: + return job_retry(do_query)() + else: + return do_query() + + +def _supported_by_jobs_query(job_config: Optional[job.QueryJobConfig]) -> bool: + """True if jobs.query can be used. False if jobs.insert is needed.""" + if job_config is None: + return True + + return ( + # These features aren't supported by jobs.query. + job_config.clustering_fields is None + and job_config.destination is None + and job_config.destination_encryption_configuration is None + and job_config.range_partitioning is None + and job_config.table_definitions is None + and job_config.time_partitioning is None + ) + + +def _wait_or_cancel( + job: job.QueryJob, + api_timeout: Optional[float], + wait_timeout: Optional[float], + retry: Optional[retries.Retry], + page_size: Optional[int], + max_results: Optional[int], +) -> table.RowIterator: + """Wait for a job to complete and return the results. + + If we can't return the results within the ``wait_timeout``, try to cancel + the job. + """ + try: + return job.result( + page_size=page_size, + max_results=max_results, + retry=retry, + timeout=wait_timeout, + ) + except Exception: + # Attempt to cancel the job since we can't return the results. + try: + job.cancel(retry=retry, timeout=api_timeout) + except Exception: + # Don't eat the original exception if cancel fails. + pass + raise diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 488a9ad29..284ccddb5 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -255,23 +255,31 @@ def __init__( self._connection = Connection(self, **kw_args) self._location = location - self._default_query_job_config = copy.deepcopy(default_query_job_config) self._default_load_job_config = copy.deepcopy(default_load_job_config) + # Use property setter so validation can run. + self.default_query_job_config = default_query_job_config + @property def location(self): """Default location for jobs / datasets / tables.""" return self._location @property - def default_query_job_config(self): - """Default ``QueryJobConfig``. - Will be merged into job configs passed into the ``query`` method. + def default_query_job_config(self) -> Optional[QueryJobConfig]: + """Default ``QueryJobConfig`` or ``None``. + + Will be merged into job configs passed into the ``query`` or + ``query_and_wait`` methods. """ return self._default_query_job_config @default_query_job_config.setter - def default_query_job_config(self, value: QueryJobConfig): + def default_query_job_config(self, value: Optional[QueryJobConfig]): + if value is not None: + _verify_job_config_type( + value, QueryJobConfig, param_name="default_query_job_config" + ) self._default_query_job_config = copy.deepcopy(value) @property @@ -3355,26 +3363,12 @@ def query( if location is None: location = self.location - if self._default_query_job_config: - if job_config: - _verify_job_config_type( - job_config, google.cloud.bigquery.job.QueryJobConfig - ) - # anything that's not defined on the incoming - # that is in the default, - # should be filled in with the default - # the incoming therefore has precedence - # - # Note that _fill_from_default doesn't mutate the receiver - job_config = job_config._fill_from_default( - self._default_query_job_config - ) - else: - _verify_job_config_type( - self._default_query_job_config, - google.cloud.bigquery.job.QueryJobConfig, - ) - job_config = self._default_query_job_config + if job_config is not None: + _verify_job_config_type(job_config, QueryJobConfig) + + job_config = _job_helpers.job_config_with_defaults( + job_config, self._default_query_job_config + ) # Note that we haven't modified the original job_config (or # _default_query_job_config) up to this point. @@ -3405,6 +3399,112 @@ def query( else: raise ValueError(f"Got unexpected value for api_method: {repr(api_method)}") + def query_and_wait( + self, + query, + *, + job_config: Optional[QueryJobConfig] = None, + location: Optional[str] = None, + project: Optional[str] = None, + api_timeout: TimeoutType = DEFAULT_TIMEOUT, + wait_timeout: TimeoutType = None, + retry: retries.Retry = DEFAULT_RETRY, + job_retry: retries.Retry = DEFAULT_JOB_RETRY, + page_size: Optional[int] = None, + max_results: Optional[int] = None, + ) -> RowIterator: + """Run the query, wait for it to finish, and return the results. + + While ``jobCreationMode=JOB_CREATION_OPTIONAL`` is in preview in the + ``jobs.query`` REST API, use the default ``jobCreationMode`` unless + the environment variable ``QUERY_PREVIEW_ENABLED=true``. After + ``jobCreationMode`` is GA, this method will always use + ``jobCreationMode=JOB_CREATION_OPTIONAL``. See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query + + Args: + query (str): + SQL query to be executed. Defaults to the standard SQL + dialect. Use the ``job_config`` parameter to change dialects. + job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): + Extra configuration options for the job. + To override any options that were previously set in + the ``default_query_job_config`` given to the + ``Client`` constructor, manually set those options to ``None``, + or whatever value is preferred. + location (Optional[str]): + Location where to run the job. Must match the location of the + table used in the query as well as the destination table. + project (Optional[str]): + Project ID of the project of where to run the job. Defaults + to the client's project. + api_timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + wait_timeout (Optional[float]): + The number of seconds to wait for the query to finish. If the + query doesn't finish before this timeout, the client attempts + to cancel the query. + retry (Optional[google.api_core.retry.Retry]): + How to retry the RPC. This only applies to making RPC + calls. It isn't used to retry failed jobs. This has + a reasonable default that should only be overridden + with care. + job_retry (Optional[google.api_core.retry.Retry]): + How to retry failed jobs. The default retries + rate-limit-exceeded errors. Passing ``None`` disables + job retry. Not all jobs can be retried. + page_size (Optional[int]): + The maximum number of rows in each page of results from this + request. Non-positive values are ignored. + max_results (Optional[int]): + The maximum total number of rows from this request. + + Returns: + google.cloud.bigquery.table.RowIterator: + Iterator of row data + :class:`~google.cloud.bigquery.table.Row`-s. During each + page, the iterator will have the ``total_rows`` attribute + set, which counts the total number of rows **in the result + set** (this is distinct from the total number of rows in the + current page: ``iterator.page.num_items``). + + If the query is a special query that produces no results, e.g. + a DDL query, an ``_EmptyRowIterator`` instance is returned. + + Raises: + TypeError: + If ``job_config`` is not an instance of + :class:`~google.cloud.bigquery.job.QueryJobConfig` + class. + """ + if project is None: + project = self.project + + if location is None: + location = self.location + + if job_config is not None: + _verify_job_config_type(job_config, QueryJobConfig) + + job_config = _job_helpers.job_config_with_defaults( + job_config, self._default_query_job_config + ) + + return _job_helpers.query_and_wait( + self, + query, + job_config=job_config, + location=location, + project=project, + api_timeout=api_timeout, + wait_timeout=wait_timeout, + retry=retry, + job_retry=job_retry, + page_size=page_size, + max_results=max_results, + ) + def insert_rows( self, table: Union[Table, TableReference, str], @@ -3853,7 +3953,7 @@ def _list_rows_from_query_results( job_id: str, location: str, project: str, - schema: SchemaField, + schema: Sequence[SchemaField], total_rows: Optional[int] = None, destination: Optional[Union[Table, TableReference, TableListItem, str]] = None, max_results: Optional[int] = None, diff --git a/google/cloud/bigquery/job/base.py b/google/cloud/bigquery/job/base.py index 78df9142f..97e0ea3bd 100644 --- a/google/cloud/bigquery/job/base.py +++ b/google/cloud/bigquery/job/base.py @@ -21,6 +21,7 @@ import typing from typing import ClassVar, Dict, Optional, Sequence +from google.api_core import retry as retries from google.api_core import exceptions import google.api_core.future.polling @@ -28,9 +29,6 @@ from google.cloud.bigquery.retry import DEFAULT_RETRY from google.cloud.bigquery._helpers import _int_or_none -if typing.TYPE_CHECKING: # pragma: NO COVER - from google.api_core import retry as retries - _DONE_STATE = "DONE" _STOPPED_REASON = "stopped" @@ -825,7 +823,7 @@ def reload( def cancel( self, client=None, - retry: "retries.Retry" = DEFAULT_RETRY, + retry: Optional[retries.Retry] = DEFAULT_RETRY, timeout: Optional[float] = None, ) -> bool: """API call: cancel job via a POST request @@ -921,9 +919,9 @@ def done( self.reload(retry=retry, timeout=timeout) return self.state == _DONE_STATE - def result( # type: ignore # (signature complaint) + def result( # type: ignore # (incompatible with supertype) self, - retry: "retries.Retry" = DEFAULT_RETRY, + retry: Optional[retries.Retry] = DEFAULT_RETRY, timeout: Optional[float] = None, ) -> "_AsyncJob": """Start the job and wait for it to complete and get the result. diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 79cd207a1..4a529f949 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -22,6 +22,7 @@ from google.api_core import exceptions from google.api_core.future import polling as polling_future +from google.api_core import retry as retries import requests from google.cloud.bigquery.dataset import Dataset @@ -69,7 +70,6 @@ import pandas # type: ignore import geopandas # type: ignore import pyarrow # type: ignore - from google.api_core import retry as retries from google.cloud import bigquery_storage from google.cloud.bigquery.client import Client from google.cloud.bigquery.table import RowIterator @@ -779,7 +779,7 @@ def to_api_repr(self) -> dict: resource = copy.deepcopy(self._properties) # Query parameters have an addition property associated with them # to indicate if the query is using named or positional parameters. - query_parameters = resource["query"].get("queryParameters") + query_parameters = resource.get("query", {}).get("queryParameters") if query_parameters: if query_parameters[0].get("name") is None: resource["query"]["parameterMode"] = "POSITIONAL" @@ -1469,14 +1469,14 @@ def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None): except exceptions.GoogleAPIError as exc: self.set_exception(exc) - def result( # type: ignore # (complaints about the overloaded signature) + def result( # type: ignore # (incompatible with supertype) self, page_size: Optional[int] = None, max_results: Optional[int] = None, - retry: "retries.Retry" = DEFAULT_RETRY, + retry: Optional[retries.Retry] = DEFAULT_RETRY, timeout: Optional[float] = None, start_index: Optional[int] = None, - job_retry: "retries.Retry" = DEFAULT_JOB_RETRY, + job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY, ) -> Union["RowIterator", _EmptyRowIterator]: """Start the job and wait for it to complete and get the result. diff --git a/google/cloud/bigquery/query.py b/google/cloud/bigquery/query.py index 54abe95a7..43591c648 100644 --- a/google/cloud/bigquery/query.py +++ b/google/cloud/bigquery/query.py @@ -911,6 +911,18 @@ def job_id(self): """ return self._properties.get("jobReference", {}).get("jobId") + @property + def location(self): + """Location of the query job these results are from. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#body.QueryResponse.FIELDS.job_reference + + Returns: + str: Job ID of the query job. + """ + return self._properties.get("jobReference", {}).get("location") + @property def query_id(self) -> Optional[str]: """[Preview] ID of a completed query. diff --git a/noxfile.py b/noxfile.py index ab7803040..41492c7f0 100644 --- a/noxfile.py +++ b/noxfile.py @@ -205,13 +205,15 @@ def system(session): @nox.session(python=DEFAULT_PYTHON_VERSION) def mypy_samples(session): """Run type checks with mypy.""" - session.install("-e", ".[all]") - session.install("pytest") for requirements_path in CURRENT_DIRECTORY.glob("samples/*/requirements.txt"): - session.install("-r", requirements_path) + session.install("-r", str(requirements_path)) session.install(MYPY_VERSION) + # requirements.txt might include this package. Install from source so that + # we can author samples with unreleased features. + session.install("-e", ".[all]") + # Just install the dependencies' type info directly, since "mypy --install-types" # might require an additional pass. session.install( diff --git a/samples/client_query.py b/samples/client_query.py index 4df051ee2..80eac854e 100644 --- a/samples/client_query.py +++ b/samples/client_query.py @@ -14,6 +14,9 @@ def client_query() -> None: + # TODO(swast): remove once docs in cloud.google.com have been updated to + # use samples/snippets/client_query.py + # [START bigquery_query] from google.cloud import bigquery diff --git a/samples/snippets/client_query.py b/samples/snippets/client_query.py new file mode 100644 index 000000000..ccae2e8bd --- /dev/null +++ b/samples/snippets/client_query.py @@ -0,0 +1,37 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def client_query() -> None: + # [START bigquery_query] + from google.cloud import bigquery + + # Construct a BigQuery client object. + client = bigquery.Client() + + query = """ + SELECT name, SUM(number) as total_people + FROM `bigquery-public-data.usa_names.usa_1910_2013` + WHERE state = 'TX' + GROUP BY name, state + ORDER BY total_people DESC + LIMIT 20 + """ + rows = client.query_and_wait(query) # Make an API request. + + print("The query data:") + for row in rows: + # Row values can be accessed by field name or index. + print("name={}, count={}".format(row[0], row["total_people"])) + # [END bigquery_query] diff --git a/samples/snippets/client_query_test.py b/samples/snippets/client_query_test.py new file mode 100644 index 000000000..1bc83a230 --- /dev/null +++ b/samples/snippets/client_query_test.py @@ -0,0 +1,38 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +import client_query # type: ignore + +if typing.TYPE_CHECKING: + import pytest + + +def test_client_query(capsys: "pytest.CaptureFixture[str]") -> None: + client_query.client_query() + out, _ = capsys.readouterr() + assert "The query data:" in out + assert "name=James, count=272793" in out + + +def test_client_query_job_optional( + capsys: "pytest.CaptureFixture[str]", monkeypatch: "pytest.MonkeyPatch" +) -> None: + monkeypatch.setenv("QUERY_PREVIEW_ENABLED", "true") + + client_query.client_query() + out, _ = capsys.readouterr() + assert "The query data:" in out + assert "name=James, count=272793" in out diff --git a/tests/unit/test__job_helpers.py b/tests/unit/test__job_helpers.py index 012352f4e..f2fe32d94 100644 --- a/tests/unit/test__job_helpers.py +++ b/tests/unit/test__job_helpers.py @@ -12,15 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools from typing import Any, Dict, Optional from unittest import mock +import freezegun +import google.api_core.exceptions from google.api_core import retry as retries import pytest from google.cloud.bigquery.client import Client from google.cloud.bigquery import _job_helpers -from google.cloud.bigquery.job.query import QueryJob, QueryJobConfig +from google.cloud.bigquery.job import query as job_query from google.cloud.bigquery.query import ConnectionProperty, ScalarQueryParameter @@ -55,9 +58,9 @@ def make_query_response( ("job_config", "expected"), ( (None, make_query_request()), - (QueryJobConfig(), make_query_request()), + (job_query.QueryJobConfig(), make_query_request()), ( - QueryJobConfig(default_dataset="my-project.my_dataset"), + job_query.QueryJobConfig(default_dataset="my-project.my_dataset"), make_query_request( { "defaultDataset": { @@ -67,17 +70,17 @@ def make_query_response( } ), ), - (QueryJobConfig(dry_run=True), make_query_request({"dryRun": True})), + (job_query.QueryJobConfig(dry_run=True), make_query_request({"dryRun": True})), ( - QueryJobConfig(use_query_cache=False), + job_query.QueryJobConfig(use_query_cache=False), make_query_request({"useQueryCache": False}), ), ( - QueryJobConfig(use_legacy_sql=True), + job_query.QueryJobConfig(use_legacy_sql=True), make_query_request({"useLegacySql": True}), ), ( - QueryJobConfig( + job_query.QueryJobConfig( query_parameters=[ ScalarQueryParameter("named_param1", "STRING", "param-value"), ScalarQueryParameter("named_param2", "INT64", 123), @@ -102,7 +105,7 @@ def make_query_response( ), ), ( - QueryJobConfig( + job_query.QueryJobConfig( query_parameters=[ ScalarQueryParameter(None, "STRING", "param-value"), ScalarQueryParameter(None, "INT64", 123), @@ -125,7 +128,7 @@ def make_query_response( ), ), ( - QueryJobConfig( + job_query.QueryJobConfig( connection_properties=[ ConnectionProperty(key="time_zone", value="America/Chicago"), ConnectionProperty(key="session_id", value="abcd-efgh-ijkl-mnop"), @@ -141,17 +144,18 @@ def make_query_response( ), ), ( - QueryJobConfig(labels={"abc": "def"}), + job_query.QueryJobConfig(labels={"abc": "def"}), make_query_request({"labels": {"abc": "def"}}), ), ( - QueryJobConfig(maximum_bytes_billed=987654), + job_query.QueryJobConfig(maximum_bytes_billed=987654), make_query_request({"maximumBytesBilled": "987654"}), ), ), ) def test__to_query_request(job_config, expected): - result = _job_helpers._to_query_request(job_config) + result = _job_helpers._to_query_request(job_config, query="SELECT 1") + expected["query"] = "SELECT 1" assert result == expected @@ -160,7 +164,9 @@ def test__to_query_job_defaults(): response = make_query_response( job_id="test-job", project_id="some-project", location="asia-northeast1" ) - job: QueryJob = _job_helpers._to_query_job(mock_client, "query-str", None, response) + job: job_query.QueryJob = _job_helpers._to_query_job( + mock_client, "query-str", None, response + ) assert job.query == "query-str" assert job._client is mock_client assert job.job_id == "test-job" @@ -175,9 +181,9 @@ def test__to_query_job_dry_run(): response = make_query_response( job_id="test-job", project_id="some-project", location="asia-northeast1" ) - job_config: QueryJobConfig = QueryJobConfig() + job_config: job_query.QueryJobConfig = job_query.QueryJobConfig() job_config.dry_run = True - job: QueryJob = _job_helpers._to_query_job( + job: job_query.QueryJob = _job_helpers._to_query_job( mock_client, "query-str", job_config, response ) assert job.dry_run is True @@ -193,7 +199,9 @@ def test__to_query_job_dry_run(): def test__to_query_job_sets_state(completed, expected_state): mock_client = mock.create_autospec(Client) response = make_query_response(completed=completed) - job: QueryJob = _job_helpers._to_query_job(mock_client, "query-str", None, response) + job: job_query.QueryJob = _job_helpers._to_query_job( + mock_client, "query-str", None, response + ) assert job.state == expected_state @@ -206,7 +214,9 @@ def test__to_query_job_sets_errors(): {"message": "something else went wrong"}, ] ) - job: QueryJob = _job_helpers._to_query_job(mock_client, "query-str", None, response) + job: job_query.QueryJob = _job_helpers._to_query_job( + mock_client, "query-str", None, response + ) assert len(job.errors) == 2 # If we got back a response instead of an HTTP error status code, most # likely the job didn't completely fail. @@ -313,6 +323,717 @@ def test_query_jobs_query_sets_timeout(timeout, expected_timeout): assert request["timeoutMs"] == expected_timeout +def test_query_and_wait_uses_jobs_insert(): + """With unsupported features, call jobs.insert instead of jobs.query.""" + client = mock.create_autospec(Client) + client._call_api.return_value = { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "query": { + "query": "SELECT 1", + }, + # Make sure the job has "started" + "status": {"state": "DONE"}, + "jobComplete": True, + } + job_config = job_query.QueryJobConfig( + destination="dest-project.dest_dset.dest_table", + ) + _job_helpers.query_and_wait( + client, + query="SELECT 1", + location="request-location", + project="request-project", + job_config=job_config, + retry=None, + job_retry=None, + page_size=None, + max_results=None, + ) + + # We should call jobs.insert since jobs.query doesn't support destination. + request_path = "/projects/request-project/jobs" + client._call_api.assert_any_call( + None, # retry, + span_name="BigQuery.job.begin", + span_attributes={"path": request_path}, + job_ref=mock.ANY, + method="POST", + path=request_path, + data={ + "jobReference": { + "jobId": mock.ANY, + "projectId": "request-project", + "location": "request-location", + }, + "configuration": { + "query": { + "destinationTable": { + "projectId": "dest-project", + "datasetId": "dest_dset", + "tableId": "dest_table", + }, + "useLegacySql": False, + "query": "SELECT 1", + } + }, + }, + timeout=None, + ) + + +def test_query_and_wait_retries_job(): + freezegun.freeze_time(auto_tick_seconds=100) + client = mock.create_autospec(Client) + client._call_api.__name__ = "_call_api" + client._call_api.__qualname__ = "Client._call_api" + client._call_api.__annotations__ = {} + client._call_api.__type_params__ = () + client._call_api.side_effect = ( + google.api_core.exceptions.BadGateway("retry me"), + google.api_core.exceptions.InternalServerError("job_retry me"), + google.api_core.exceptions.BadGateway("retry me"), + { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": True, + "schema": { + "fields": [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INT64", "mode": "NULLABLE"}, + ], + }, + "rows": [ + {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, + {"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]}, + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ], + }, + ) + rows = _job_helpers.query_and_wait( + client, + query="SELECT 1", + location="request-location", + project="request-project", + job_config=None, + page_size=None, + max_results=None, + retry=retries.Retry( + lambda exc: isinstance(exc, google.api_core.exceptions.BadGateway), + multiplier=1.0, + ).with_deadline( + 200.0 + ), # Since auto_tick_seconds is 100, we should get at least 1 retry. + job_retry=retries.Retry( + lambda exc: isinstance(exc, google.api_core.exceptions.InternalServerError), + multiplier=1.0, + ).with_deadline(600.0), + ) + assert len(list(rows)) == 4 + + # For this code path, where the query has finished immediately, we should + # only be calling the jobs.query API and no other request path. + request_path = "/projects/request-project/queries" + for call in client._call_api.call_args_list: + _, kwargs = call + assert kwargs["method"] == "POST" + assert kwargs["path"] == request_path + + +@freezegun.freeze_time(auto_tick_seconds=100) +def test_query_and_wait_retries_job_times_out(): + client = mock.create_autospec(Client) + client._call_api.__name__ = "_call_api" + client._call_api.__qualname__ = "Client._call_api" + client._call_api.__annotations__ = {} + client._call_api.__type_params__ = () + client._call_api.side_effect = ( + google.api_core.exceptions.BadGateway("retry me"), + google.api_core.exceptions.InternalServerError("job_retry me"), + google.api_core.exceptions.BadGateway("retry me"), + google.api_core.exceptions.InternalServerError("job_retry me"), + ) + + with pytest.raises(google.api_core.exceptions.RetryError) as exc_info: + _job_helpers.query_and_wait( + client, + query="SELECT 1", + location="request-location", + project="request-project", + job_config=None, + page_size=None, + max_results=None, + retry=retries.Retry( + lambda exc: isinstance(exc, google.api_core.exceptions.BadGateway), + multiplier=1.0, + ).with_deadline( + 200.0 + ), # Since auto_tick_seconds is 100, we should get at least 1 retry. + job_retry=retries.Retry( + lambda exc: isinstance( + exc, google.api_core.exceptions.InternalServerError + ), + multiplier=1.0, + ).with_deadline(400.0), + ) + + assert isinstance( + exc_info.value.cause, google.api_core.exceptions.InternalServerError + ) + + +def test_query_and_wait_sets_job_creation_mode(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setenv( + "QUERY_PREVIEW_ENABLED", + # The comparison should be case insensitive. + "TrUe", + ) + client = mock.create_autospec(Client) + client._call_api.return_value = { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": True, + } + _job_helpers.query_and_wait( + client, + query="SELECT 1", + location="request-location", + project="request-project", + job_config=None, + retry=None, + job_retry=None, + page_size=None, + max_results=None, + ) + + # We should only call jobs.query once, no additional row requests needed. + request_path = "/projects/request-project/queries" + client._call_api.assert_called_once_with( + None, # retry + span_name="BigQuery.query", + span_attributes={"path": request_path}, + method="POST", + path=request_path, + data={ + "query": "SELECT 1", + "location": "request-location", + "useLegacySql": False, + "formatOptions": { + "useInt64Timestamp": True, + }, + "requestId": mock.ANY, + "jobCreationMode": "JOB_CREATION_OPTIONAL", + }, + timeout=None, + ) + + +def test_query_and_wait_sets_location(): + client = mock.create_autospec(Client) + client._call_api.return_value = { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": True, + } + rows = _job_helpers.query_and_wait( + client, + query="SELECT 1", + location="request-location", + project="request-project", + job_config=None, + retry=None, + job_retry=None, + page_size=None, + max_results=None, + ) + assert rows.location == "response-location" + + # We should only call jobs.query once, no additional row requests needed. + request_path = "/projects/request-project/queries" + client._call_api.assert_called_once_with( + None, # retry + span_name="BigQuery.query", + span_attributes={"path": request_path}, + method="POST", + path=request_path, + data={ + "query": "SELECT 1", + "location": "request-location", + "useLegacySql": False, + "formatOptions": { + "useInt64Timestamp": True, + }, + "requestId": mock.ANY, + }, + timeout=None, + ) + + +@pytest.mark.parametrize( + ("max_results", "page_size", "expected"), + [ + (10, None, 10), + (None, 11, 11), + (12, 100, 12), + (100, 13, 13), + ], +) +def test_query_and_wait_sets_max_results(max_results, page_size, expected): + client = mock.create_autospec(Client) + client._call_api.return_value = { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": True, + } + rows = _job_helpers.query_and_wait( + client, + query="SELECT 1", + location="request-location", + project="request-project", + job_config=None, + retry=None, + job_retry=None, + page_size=page_size, + max_results=max_results, + ) + assert rows.location == "response-location" + + # We should only call jobs.query once, no additional row requests needed. + request_path = "/projects/request-project/queries" + client._call_api.assert_called_once_with( + None, # retry + span_name="BigQuery.query", + span_attributes={"path": request_path}, + method="POST", + path=request_path, + data={ + "query": "SELECT 1", + "location": "request-location", + "useLegacySql": False, + "formatOptions": { + "useInt64Timestamp": True, + }, + "requestId": mock.ANY, + "maxResults": expected, + }, + timeout=None, + ) + + +def test_query_and_wait_caches_completed_query_results_one_page(): + client = mock.create_autospec(Client) + client._call_api.return_value = { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "US", + }, + "jobComplete": True, + "queryId": "xyz", + "schema": { + "fields": [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INT64", "mode": "NULLABLE"}, + ], + }, + "rows": [ + {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, + {"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]}, + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ], + # Even though totalRows > len(rows), we should use the presense of a + # next page token to decide if there are any more pages. + "totalRows": 8, + } + rows = _job_helpers.query_and_wait( + client, + query="SELECT full_name, age FROM people;", + job_config=None, + location=None, + project="request-project", + retry=None, + job_retry=None, + page_size=None, + max_results=None, + ) + rows_list = list(rows) + assert rows.project == "response-project" + assert rows.job_id == "abc" + assert rows.location == "US" + assert rows.query_id == "xyz" + assert rows.total_rows == 8 + assert len(rows_list) == 4 + + # We should only call jobs.query once, no additional row requests needed. + request_path = "/projects/request-project/queries" + client._call_api.assert_called_once_with( + None, # retry + span_name="BigQuery.query", + span_attributes={"path": request_path}, + method="POST", + path=request_path, + data={ + "query": "SELECT full_name, age FROM people;", + "useLegacySql": False, + "formatOptions": { + "useInt64Timestamp": True, + }, + "requestId": mock.ANY, + }, + timeout=None, + ) + + +def test_query_and_wait_caches_completed_query_results_one_page_no_rows(): + client = mock.create_autospec(Client) + client._call_api.return_value = { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "US", + }, + "jobComplete": True, + "queryId": "xyz", + } + rows = _job_helpers.query_and_wait( + client, + query="CREATE TABLE abc;", + project="request-project", + job_config=None, + location=None, + retry=None, + job_retry=None, + page_size=None, + max_results=None, + ) + assert rows.project == "response-project" + assert rows.job_id == "abc" + assert rows.location == "US" + assert rows.query_id == "xyz" + assert list(rows) == [] + + # We should only call jobs.query once, no additional row requests needed. + request_path = "/projects/request-project/queries" + client._call_api.assert_called_once_with( + None, # retry + span_name="BigQuery.query", + span_attributes={"path": request_path}, + method="POST", + path=request_path, + data={ + "query": "CREATE TABLE abc;", + "useLegacySql": False, + "formatOptions": { + "useInt64Timestamp": True, + }, + "requestId": mock.ANY, + }, + timeout=None, + ) + + +def test_query_and_wait_caches_completed_query_results_more_pages(): + client = mock.create_autospec(Client) + client._list_rows_from_query_results = functools.partial( + Client._list_rows_from_query_results, client + ) + client._call_api.side_effect = ( + { + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + "jobComplete": True, + "queryId": "xyz", + "schema": { + "fields": [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INT64", "mode": "NULLABLE"}, + ], + }, + "rows": [ + {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, + {"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]}, + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ], + # Even though totalRows <= len(rows), we should use the presense of a + # next page token to decide if there are any more pages. + "totalRows": 2, + "pageToken": "page-2", + }, + # TODO(swast): This is a case where we can avoid a call to jobs.get, + # but currently do so because the RowIterator might need the + # destination table, since results aren't fully cached. + { + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + }, + { + "rows": [ + {"f": [{"v": "Pebbles Phlyntstone"}, {"v": "4"}]}, + {"f": [{"v": "Bamm-Bamm Rhubble"}, {"v": "5"}]}, + {"f": [{"v": "Joseph Rockhead"}, {"v": "32"}]}, + {"f": [{"v": "Perry Masonry"}, {"v": "33"}]}, + ], + "totalRows": 3, + "pageToken": "page-3", + }, + { + "rows": [ + {"f": [{"v": "Pearl Slaghoople"}, {"v": "53"}]}, + ], + "totalRows": 4, + }, + ) + rows = _job_helpers.query_and_wait( + client, + query="SELECT full_name, age FROM people;", + project="request-project", + job_config=None, + location=None, + retry=None, + job_retry=None, + page_size=None, + max_results=None, + ) + assert rows.total_rows == 2 # Match the API response. + rows_list = list(rows) + assert rows.total_rows == 4 # Match the final API response. + assert len(rows_list) == 9 + + # Start the query. + jobs_query_path = "/projects/request-project/queries" + client._call_api.assert_any_call( + None, # retry + span_name="BigQuery.query", + span_attributes={"path": jobs_query_path}, + method="POST", + path=jobs_query_path, + data={ + "query": "SELECT full_name, age FROM people;", + "useLegacySql": False, + "formatOptions": { + "useInt64Timestamp": True, + }, + "requestId": mock.ANY, + }, + timeout=None, + ) + + # TODO(swast): Fetching job metadata isn't necessary in this case. + jobs_get_path = "/projects/response-project/jobs/response-job-id" + client._call_api.assert_any_call( + None, # retry + span_name="BigQuery.job.reload", + span_attributes={"path": jobs_get_path}, + job_ref=mock.ANY, + method="GET", + path=jobs_get_path, + query_params={"location": "response-location"}, + timeout=None, + ) + + # Fetch the remaining two pages. + jobs_get_query_results_path = "/projects/response-project/queries/response-job-id" + client._call_api.assert_any_call( + None, # retry + timeout=None, + method="GET", + path=jobs_get_query_results_path, + query_params={ + "pageToken": "page-2", + "fields": "jobReference,totalRows,pageToken,rows", + "location": "response-location", + "formatOptions.useInt64Timestamp": True, + }, + ) + client._call_api.assert_any_call( + None, # retry + timeout=None, + method="GET", + path=jobs_get_query_results_path, + query_params={ + "pageToken": "page-3", + "fields": "jobReference,totalRows,pageToken,rows", + "location": "response-location", + "formatOptions.useInt64Timestamp": True, + }, + ) + + +def test_query_and_wait_incomplete_query(): + client = mock.create_autospec(Client) + client._get_query_results = functools.partial(Client._get_query_results, client) + client._list_rows_from_query_results = functools.partial( + Client._list_rows_from_query_results, client + ) + client._call_api.side_effect = ( + { + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + "jobComplete": False, + }, + { + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + "jobComplete": True, + "totalRows": 2, + "queryId": "xyz", + "schema": { + "fields": [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INT64", "mode": "NULLABLE"}, + ], + }, + }, + { + "jobReference": { + "projectId": "response-project", + "jobId": "response-job-id", + "location": "response-location", + }, + }, + { + "rows": [ + {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, + {"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]}, + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ], + # Even though totalRows <= len(rows), we should use the presense of a + # next page token to decide if there are any more pages. + "totalRows": 2, + "pageToken": "page-2", + }, + { + "rows": [ + {"f": [{"v": "Pearl Slaghoople"}, {"v": "53"}]}, + ], + }, + ) + rows = _job_helpers.query_and_wait( + client, + query="SELECT full_name, age FROM people;", + project="request-project", + job_config=None, + location=None, + retry=None, + job_retry=None, + page_size=None, + max_results=None, + ) + rows_list = list(rows) + assert rows.total_rows == 2 # Match the API response. + assert len(rows_list) == 5 + + # Start the query. + jobs_query_path = "/projects/request-project/queries" + client._call_api.assert_any_call( + None, # retry + span_name="BigQuery.query", + span_attributes={"path": jobs_query_path}, + method="POST", + path=jobs_query_path, + data={ + "query": "SELECT full_name, age FROM people;", + "useLegacySql": False, + "formatOptions": { + "useInt64Timestamp": True, + }, + "requestId": mock.ANY, + }, + timeout=None, + ) + + # Wait for the query to finish. + jobs_get_query_results_path = "/projects/response-project/queries/response-job-id" + client._call_api.assert_any_call( + None, # retry + span_name="BigQuery.getQueryResults", + span_attributes={"path": jobs_get_query_results_path}, + method="GET", + path=jobs_get_query_results_path, + query_params={ + # job_query.QueryJob uses getQueryResults to wait for the query to finish. + # It avoids fetching the results because: + # (1) For large rows this can take a long time, much longer than + # our progress bar update frequency. + # See: /~https://github.com/googleapis/python-bigquery/issues/403 + # (2) Caching the first page of results uses an unexpected increase in memory. + # See: /~https://github.com/googleapis/python-bigquery/issues/394 + "maxResults": 0, + "location": "response-location", + }, + timeout=None, + ) + + # Fetch the job metadata in case the RowIterator needs the destination table. + jobs_get_path = "/projects/response-project/jobs/response-job-id" + client._call_api.assert_any_call( + None, # retry + span_name="BigQuery.job.reload", + span_attributes={"path": jobs_get_path}, + job_ref=mock.ANY, + method="GET", + path=jobs_get_path, + query_params={"location": "response-location"}, + timeout=None, + ) + + # Fetch the remaining two pages. + client._call_api.assert_any_call( + None, # retry + timeout=None, + method="GET", + path=jobs_get_query_results_path, + query_params={ + "fields": "jobReference,totalRows,pageToken,rows", + "location": "response-location", + "formatOptions.useInt64Timestamp": True, + }, + ) + client._call_api.assert_any_call( + None, # retry + timeout=None, + method="GET", + path=jobs_get_query_results_path, + query_params={ + "pageToken": "page-2", + "fields": "jobReference,totalRows,pageToken,rows", + "location": "response-location", + "formatOptions.useInt64Timestamp": True, + }, + ) + + def test_make_job_id_wo_suffix(): job_id = _job_helpers.make_job_id("job_id") assert job_id == "job_id" @@ -335,3 +1056,120 @@ def test_make_job_id_random(): def test_make_job_id_w_job_id_overrides_prefix(): job_id = _job_helpers.make_job_id("job_id", prefix="unused_prefix") assert job_id == "job_id" + + +@pytest.mark.parametrize( + ("job_config", "expected"), + ( + pytest.param(None, True), + pytest.param(job_query.QueryJobConfig(), True, id="default"), + pytest.param( + job_query.QueryJobConfig(use_query_cache=False), True, id="use_query_cache" + ), + pytest.param( + job_query.QueryJobConfig(maximum_bytes_billed=10_000_000), + True, + id="maximum_bytes_billed", + ), + pytest.param( + job_query.QueryJobConfig(clustering_fields=["a", "b", "c"]), + False, + id="clustering_fields", + ), + pytest.param( + job_query.QueryJobConfig(destination="p.d.t"), False, id="destination" + ), + pytest.param( + job_query.QueryJobConfig( + destination_encryption_configuration=job_query.EncryptionConfiguration( + "key" + ) + ), + False, + id="destination_encryption_configuration", + ), + ), +) +def test_supported_by_jobs_query( + job_config: Optional[job_query.QueryJobConfig], expected: bool +): + assert _job_helpers._supported_by_jobs_query(job_config) == expected + + +def test_wait_or_cancel_no_exception(): + job = mock.create_autospec(job_query.QueryJob, instance=True) + expected_rows = object() + job.result.return_value = expected_rows + retry = retries.Retry() + + rows = _job_helpers._wait_or_cancel( + job, + api_timeout=123, + wait_timeout=456, + retry=retry, + page_size=789, + max_results=101112, + ) + + job.result.assert_called_once_with( + timeout=456, + retry=retry, + page_size=789, + max_results=101112, + ) + assert rows is expected_rows + + +def test_wait_or_cancel_exception_cancels_job(): + job = mock.create_autospec(job_query.QueryJob, instance=True) + job.result.side_effect = google.api_core.exceptions.BadGateway("test error") + retry = retries.Retry() + + with pytest.raises(google.api_core.exceptions.BadGateway): + _job_helpers._wait_or_cancel( + job, + api_timeout=123, + wait_timeout=456, + retry=retry, + page_size=789, + max_results=101112, + ) + + job.result.assert_called_once_with( + timeout=456, + retry=retry, + page_size=789, + max_results=101112, + ) + job.cancel.assert_called_once_with( + timeout=123, + retry=retry, + ) + + +def test_wait_or_cancel_exception_raises_original_exception(): + job = mock.create_autospec(job_query.QueryJob, instance=True) + job.result.side_effect = google.api_core.exceptions.BadGateway("test error") + job.cancel.side_effect = google.api_core.exceptions.NotFound("don't raise me") + retry = retries.Retry() + + with pytest.raises(google.api_core.exceptions.BadGateway): + _job_helpers._wait_or_cancel( + job, + api_timeout=123, + wait_timeout=456, + retry=retry, + page_size=789, + max_results=101112, + ) + + job.result.assert_called_once_with( + timeout=456, + retry=retry, + page_size=789, + max_results=101112, + ) + job.cancel.assert_called_once_with( + timeout=123, + retry=retry, + ) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index ff4c40f48..c8968adbb 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -70,8 +70,9 @@ from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery import exceptions -from google.cloud.bigquery.retry import DEFAULT_TIMEOUT from google.cloud.bigquery import ParquetOptions +from google.cloud.bigquery.retry import DEFAULT_TIMEOUT +import google.cloud.bigquery.table try: from google.cloud import bigquery_storage @@ -4953,20 +4954,17 @@ def test_query_w_client_default_config_no_incoming(self): ) def test_query_w_invalid_default_job_config(self): - job_id = "some-job-id" - query = "select count(*) from persons" creds = _make_credentials() http = object() default_job_config = object() - client = self._make_one( - project=self.PROJECT, - credentials=creds, - _http=http, - default_query_job_config=default_job_config, - ) with self.assertRaises(TypeError) as exc: - client.query(query, job_id=job_id, location=self.LOCATION) + self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + default_query_job_config=default_job_config, + ) self.assertIn("Expected an instance of QueryJobConfig", exc.exception.args[0]) def test_query_w_client_location(self): @@ -5213,6 +5211,150 @@ def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_succeeds(self): assert result is mock.sentinel.query_job + def test_query_and_wait_defaults(self): + query = "select count(*) from `bigquery-public-data.usa_names.usa_1910_2013`" + jobs_query_response = { + "jobComplete": True, + "schema": { + "fields": [ + { + "name": "f0_", + "type": "INTEGER", + "mode": "NULLABLE", + }, + ], + }, + "totalRows": "1", + "rows": [{"f": [{"v": "5552452"}]}], + "queryId": "job_abcDEF_", + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(jobs_query_response) + + rows = client.query_and_wait(query) + + self.assertIsInstance(rows, google.cloud.bigquery.table.RowIterator) + self.assertEqual(rows.query_id, "job_abcDEF_") + self.assertEqual(rows.total_rows, 1) + # No job reference in the response should be OK for completed query. + self.assertIsNone(rows.job_id) + self.assertIsNone(rows.project) + self.assertIsNone(rows.location) + + # Verify the request we send is to jobs.query. + conn.api_request.assert_called_once() + _, req = conn.api_request.call_args + self.assertEqual(req["method"], "POST") + self.assertEqual(req["path"], "/projects/PROJECT/queries") + self.assertEqual(req["timeout"], DEFAULT_TIMEOUT) + sent = req["data"] + self.assertEqual(sent["query"], query) + self.assertFalse(sent["useLegacySql"]) + + def test_query_and_wait_w_default_query_job_config(self): + from google.cloud.bigquery import job + + query = "select count(*) from `bigquery-public-data.usa_names.usa_1910_2013`" + jobs_query_response = { + "jobComplete": True, + } + creds = _make_credentials() + http = object() + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + default_query_job_config=job.QueryJobConfig( + labels={ + "default-label": "default-value", + }, + ), + ) + conn = client._connection = make_connection(jobs_query_response) + + _ = client.query_and_wait(query) + + # Verify the request we send is to jobs.query. + conn.api_request.assert_called_once() + _, req = conn.api_request.call_args + self.assertEqual(req["method"], "POST") + self.assertEqual(req["path"], f"/projects/{self.PROJECT}/queries") + sent = req["data"] + self.assertEqual(sent["labels"], {"default-label": "default-value"}) + + def test_query_and_wait_w_job_config(self): + from google.cloud.bigquery import job + + query = "select count(*) from `bigquery-public-data.usa_names.usa_1910_2013`" + jobs_query_response = { + "jobComplete": True, + } + creds = _make_credentials() + http = object() + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + ) + conn = client._connection = make_connection(jobs_query_response) + + _ = client.query_and_wait( + query, + job_config=job.QueryJobConfig( + labels={ + "job_config-label": "job_config-value", + }, + ), + ) + + # Verify the request we send is to jobs.query. + conn.api_request.assert_called_once() + _, req = conn.api_request.call_args + self.assertEqual(req["method"], "POST") + self.assertEqual(req["path"], f"/projects/{self.PROJECT}/queries") + sent = req["data"] + self.assertEqual(sent["labels"], {"job_config-label": "job_config-value"}) + + def test_query_and_wait_w_location(self): + query = "select count(*) from `bigquery-public-data.usa_names.usa_1910_2013`" + jobs_query_response = { + "jobComplete": True, + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(jobs_query_response) + + _ = client.query_and_wait(query, location="not-the-client-location") + + # Verify the request we send is to jobs.query. + conn.api_request.assert_called_once() + _, req = conn.api_request.call_args + self.assertEqual(req["method"], "POST") + self.assertEqual(req["path"], f"/projects/{self.PROJECT}/queries") + sent = req["data"] + self.assertEqual(sent["location"], "not-the-client-location") + + def test_query_and_wait_w_project(self): + query = "select count(*) from `bigquery-public-data.usa_names.usa_1910_2013`" + jobs_query_response = { + "jobComplete": True, + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(jobs_query_response) + + _ = client.query_and_wait(query, project="not-the-client-project") + + # Verify the request we send is to jobs.query. + conn.api_request.assert_called_once() + _, req = conn.api_request.call_args + self.assertEqual(req["method"], "POST") + self.assertEqual(req["path"], "/projects/not-the-client-project/queries") + def test_insert_rows_w_timeout(self): from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py index 7c3438567..1704abac7 100644 --- a/tests/unit/test_query.py +++ b/tests/unit/test_query.py @@ -1375,6 +1375,16 @@ def test_job_id_present(self): query = self._make_one(resource) self.assertEqual(query.job_id, "custom-job") + def test_location_missing(self): + query = self._make_one({}) + self.assertIsNone(query.location) + + def test_location_present(self): + resource = self._make_resource() + resource["jobReference"]["location"] = "test-location" + query = self._make_one(resource) + self.assertEqual(query.location, "test-location") + def test_page_token_missing(self): query = self._make_one(self._make_resource()) self.assertIsNone(query.page_token)