Skip to content

Commit

Permalink
Merge pull request #17330 from chamikaramj/cherry_pick_pr_17329
Browse files Browse the repository at this point in the history
[cherry-pick][release-2.38.0][BEAM-14282] Re-raise exceptions swallowed in several Python I/O connectors
  • Loading branch information
youngoli authored Apr 12, 2022
2 parents 12bc0e0 + e67cc36 commit 22d91bd
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 10 deletions.
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,10 @@ def process(self, query, *unused_args, **unused_kwargs):
except (ClientError, GoogleAPICallError) as e:
# e.code.value contains the numeric http status code.
service_call_metric.call(e.code.value)
raise
except HttpError as e:
service_call_metric.call(e)
raise


class _Mutate(PTransform):
Expand Down
18 changes: 11 additions & 7 deletions sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,17 @@ def test_QueryFn_metric_on_failure(self):
client_query.fetch.side_effect = [
exceptions.DeadlineExceeded("Deadline exceed")
]
list(_query_fn.process(self._mock_query))
self.verify_read_call_metric(
self._PROJECT, self._NAMESPACE, "deadline_exceeded", 1)
# Test success
client_query.fetch.side_effect = [[]]
list(_query_fn.process(self._mock_query))
self.verify_read_call_metric(self._PROJECT, self._NAMESPACE, "ok", 1)
try:
list(_query_fn.process(self._mock_query))
except Exception:
self.verify_read_call_metric(
self._PROJECT, self._NAMESPACE, "deadline_exceeded", 1)
# Test success
client_query.fetch.side_effect = [[]]
list(_query_fn.process(self._mock_query))
self.verify_read_call_metric(self._PROJECT, self._NAMESPACE, "ok", 1)
else:
raise Exception('Excepted _query_fn.process call to raise an error')

def verify_read_call_metric(self, project_id, namespace, status, count):
"""Check if a metric was recorded for the Datastore IO read API call."""
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ def __init__(self, client, path, buffer_size, get_project_number):
service_call_metric.call('ok')
except HttpError as e:
service_call_metric.call(e)
raise

@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
Expand Down
46 changes: 43 additions & 3 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,17 @@ def __init__(self):
# has to persist even past the deletion of the object.
self.last_generation = {}
self.list_page_tokens = {}
self._fail_when_getting_metadata = []
self._fail_when_reading = []

def add_file(self, f):
def add_file(
self, f, fail_when_getting_metadata=False, fail_when_reading=False):
self.files[(f.bucket, f.object)] = f
self.last_generation[(f.bucket, f.object)] = f.generation
if fail_when_getting_metadata:
self._fail_when_getting_metadata.append(f)
if fail_when_reading:
self._fail_when_reading.append(f)

def get_file(self, bucket, obj):
return self.files.get((bucket, obj), None)
Expand All @@ -123,8 +130,12 @@ def Get(self, get_request, download=None): # pylint: disable=invalid-name
# Failing with an HTTP 404 if file does not exist.
raise HttpError({'status': 404}, None, None)
if download is None:
if f in self._fail_when_getting_metadata:
raise HttpError({'status': 429}, None, None)
return f.get_metadata()
else:
if f in self._fail_when_reading:
raise HttpError({'status': 429}, None, None)
stream = download.stream

def get_range_callback(start, end):
Expand Down Expand Up @@ -303,7 +314,15 @@ def __init__(self, project, region, kms_key=None):
'time', time=mock.MagicMock(side_effect=range(100)), sleep=mock.MagicMock())
class TestGCSIO(unittest.TestCase):
def _insert_random_file(
self, client, path, size, generation=1, crc32c=None, last_updated=None):
self,
client,
path,
size,
generation=1,
crc32c=None,
last_updated=None,
fail_when_getting_metadata=False,
fail_when_reading=False):
bucket, name = gcsio.parse_gcs_path(path)
f = FakeFile(
bucket,
Expand All @@ -312,7 +331,7 @@ def _insert_random_file(
generation,
crc32c=crc32c,
last_updated=last_updated)
client.objects.add_file(f)
client.objects.add_file(f, fail_when_getting_metadata, fail_when_reading)
return f

def setUp(self):
Expand Down Expand Up @@ -841,6 +860,27 @@ def test_downloader_fail_to_get_project_number(self, mock_get):

self.assertEqual(metric_value, 2)

def test_downloader_fail_non_existent_object(self):
file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
with self.assertRaises(IOError):
self.gcs.open(file_name, 'r')

def test_downloader_fail_when_getting_metadata(self):
file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
file_size = 5 * 1024 * 1024 + 100
self._insert_random_file(
self.client, file_name, file_size, fail_when_getting_metadata=True)
with self.assertRaises(HttpError):
self.gcs.open(file_name, 'r')

def test_downloader_fail_when_reading(self):
file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
file_size = 5 * 1024 * 1024 + 100
self._insert_random_file(
self.client, file_name, file_size, fail_when_reading=True)
with self.assertRaises(HttpError):
self.gcs.open(file_name, 'r')

def test_uploader_monitoring_info(self):
# Clear the process wide metric container.
MetricsEnvironment.process_wide_container().reset()
Expand Down

0 comments on commit 22d91bd

Please sign in to comment.