Skip to content

Commit

Permalink
fix: create query job in job.result() if doesn't exist (#1944)
Browse files Browse the repository at this point in the history
* fix: create query job in job.result() if doesn't exist

* Apply suggestions from code review

---------

Co-authored-by: Tim Sweña (Swast) <swast@google.com>
  • Loading branch information
Linchin and tswast authored Jun 4, 2024
1 parent 3e7a48d commit 8f5b4b7
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
5 changes: 5 additions & 0 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,11 @@ def is_job_done():
self._retry_do_query = retry_do_query
self._job_retry = job_retry

# If the job hasn't been created, create it now. Related:
# /~https://github.com/googleapis/python-bigquery/issues/1940
if self.state is None:
self._begin(retry=retry, **done_kwargs)

# Refresh the job status with jobs.get because some of the
# exceptions thrown by jobs.getQueryResults like timeout and
# rateLimitExceeded errors are ambiguous. We want to know if
Expand Down
83 changes: 83 additions & 0 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,86 @@ def test_result_dry_run(self):
self.assertIsNone(result.job_id)
self.assertIsNone(result.query_id)

# If the job doesn't exist, create the job first. Issue:
# /~https://github.com/googleapis/python-bigquery/issues/1940
def test_result_begin_job_if_not_exist(self):
begun_resource = self._make_resource()
query_running_resource = {
"jobComplete": True,
"jobReference": {
"projectId": self.PROJECT,
"jobId": self.JOB_ID,
"location": "US",
},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"status": {"state": "RUNNING"},
}
query_done_resource = {
"jobComplete": True,
"jobReference": {
"projectId": self.PROJECT,
"jobId": self.JOB_ID,
"location": "US",
},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"status": {"state": "DONE"},
}
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = make_connection(
begun_resource,
query_running_resource,
query_done_resource,
done_resource,
)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)
job._properties["jobReference"]["location"] = "US"

job.result()

create_job_call = mock.call(
method="POST",
path=f"/projects/{self.PROJECT}/jobs",
data={
"jobReference": {
"jobId": self.JOB_ID,
"projectId": self.PROJECT,
"location": "US",
},
"configuration": {
"query": {"useLegacySql": False, "query": self.QUERY},
},
},
timeout=None,
)
reload_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
query_params={"projection": "full", "location": "US"},
timeout=DEFAULT_GET_JOB_TIMEOUT,
)
get_query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={
"maxResults": 0,
"location": "US",
},
timeout=None,
)

connection.api_request.assert_has_calls(
[
# Make sure we start a job that hasn't started yet. See:
# /~https://github.com/googleapis/python-bigquery/issues/1940
create_job_call,
reload_call,
get_query_results_call,
reload_call,
]
)

def test_result_with_done_job_calls_get_query_results(self):
query_resource_done = {
"jobComplete": True,
Expand Down Expand Up @@ -1379,6 +1459,7 @@ def test_result_w_timeout_doesnt_raise(self):
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)
job._properties["jobReference"]["location"] = "US"
job._properties["status"] = {"state": "RUNNING"}

with freezegun.freeze_time("1970-01-01 00:00:00", tick=False):
job.result(
Expand Down Expand Up @@ -1429,6 +1510,7 @@ def test_result_w_timeout_raises_concurrent_futures_timeout(self):
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)
job._properties["jobReference"]["location"] = "US"
job._properties["status"] = {"state": "RUNNING"}

with freezegun.freeze_time(
"1970-01-01 00:00:00", auto_tick_seconds=1.0
Expand Down Expand Up @@ -2319,5 +2401,6 @@ def test_iter(self):
connection = make_connection(begun_resource, query_resource, done_resource)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)
job._properties["status"] = {"state": "RUNNING"}

self.assertIsInstance(iter(job), types.GeneratorType)

0 comments on commit 8f5b4b7

Please sign in to comment.