Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Amazon AWS S3 support #268 #298

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ scikit-learn==0.22.2.post1
scipy==1.4.1
threadpoolctl==2.1.0
urllib3==1.25.9
s3fs==2021.4.0
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
'scikit-learn>=0.18',
'scipy>=0.17.0',
'threadpoolctl>=1.0.0',
'urllib3>=1.22'
'urllib3>=1.22',
's3fs>=2021.4.0'
],

# List additional groups of dependencies here (e.g. development
Expand Down
2 changes: 1 addition & 1 deletion wfdb/io/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2091,7 +2091,7 @@ def _infer_sig_len(file_name, fmt, n_sig, dir_name, pn_dir=None):
file_size = os.path.getsize(os.path.join(dir_name, file_name))
else:
file_size = download._remote_file_size(file_name=file_name,
pn_dir=pn_dir)
remote_dir=pn_dir)

sig_len = int(file_size / (BYTES_PER_SAMPLE[fmt] * n_sig))

Expand Down
4 changes: 2 additions & 2 deletions wfdb/io/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1620,7 +1620,7 @@ def rdann(record_name, extension, sampfrom=0, sampto=None, shift_samps=False,
>>> ann = wfdb.rdann('sample-data/100', 'atr', sampto=300000)

"""
if (pn_dir is not None) and ('.' not in pn_dir):
if (pn_dir is not None) and ('.' not in pn_dir) and (not pn_dir.startswith('s3')):
dir_list = pn_dir.split('/')
pn_dir = posixpath.join(dir_list[0],
record.get_version(dir_list[0]),
Expand Down Expand Up @@ -2255,7 +2255,7 @@ def ann2rr(record_name, extension, pn_dir=None, start_time=None,
>>> 257

"""
if (pn_dir is not None) and ('.' not in pn_dir):
if (pn_dir is not None) and ('.' not in pn_dir) and (not pn_dir.startswith('s3')):
dir_list = pn_dir.split('/')
pn_dir = posixpath.join(dir_list[0], record.get_version(dir_list[0]),
*dir_list[1:])
Expand Down
152 changes: 104 additions & 48 deletions wfdb/io/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def set_db_index_url(db_index_url=PN_INDEX_URL):
config.db_index_url = db_index_url


def _remote_file_size(url=None, file_name=None, pn_dir=None):
def _remote_file_size(url=None, file_name=None, remote_dir=None):
"""
Get the remote file size in bytes.

Expand All @@ -59,11 +59,16 @@ def _remote_file_size(url=None, file_name=None, pn_dir=None):
The full url of the file. Use this option to explicitly
state the full url.
file_name : str, optional
The base file name. Use this argument along with pn_dir if you
The base file name. Use this argument along with remote_dir if you
want the full url to be constructed.
pn_dir : str, optional
The base file name. Use this argument along with file_name if
you want the full url to be constructed.
remote_dir : str, optional
The remote directory of either two things:
(1) The S3 URI form which to find the required header file. This
should always begin with 's3' in order to work correctly. An
example input would be: 's3://my-aws-bucket/'
(2) The PhysioNet database directory from which to find the
required header file. eg. For file '100.hea' in
'http://physionet.org/content/mitdb', remote_dir='mitdb'.

Returns
-------
Expand All @@ -72,8 +77,17 @@ def _remote_file_size(url=None, file_name=None, pn_dir=None):

"""
# Option to construct the url
if file_name and pn_dir:
url = posixpath.join(config.db_index_url, pn_dir, file_name)
if file_name and remote_dir:
if remote_dir.startswith('s3'):
# Set up the remote AWS S3 file system
import s3fs
fs = s3fs.S3FileSystem(anon=True)
file_dir = posixpath.join(remote_dir, file_name)
# Read and decode the lines
with fs.open(file_dir) as f:
return f.size
else:
url = posixpath.join(config.db_index_url, remote_dir, file_name)

response = requests.head(url, headers={'Accept-Encoding': 'identity'})
# Raise HTTPError if invalid url
Expand All @@ -85,18 +99,22 @@ def _remote_file_size(url=None, file_name=None, pn_dir=None):
return remote_file_size


def _stream_header(file_name, pn_dir):
def _stream_header(file_name, remote_dir):
"""
Stream the lines of a remote header file.

Parameters
----------
file_name : str
The name of the headerr file to be read.
pn_dir : str
The PhysioNet database directory from which to find the
required header file. eg. For file '100.hea' in
'http://physionet.org/content/mitdb', pn_dir='mitdb'.
remote_dir : str
The remote directory of either two things:
(1) The S3 URI form which to find the required header file. This
should always begin with 's3' in order to work correctly. An
example input would be: 's3://my-aws-bucket/'
(2) The PhysioNet database directory from which to find the
required header file. eg. For file '100.hea' in
'http://physionet.org/content/mitdb', remote_dir='mitdb'.

Returns
-------
Expand All @@ -106,15 +124,23 @@ def _stream_header(file_name, pn_dir):
All of the comment header lines.

"""
# Full url of header location
url = posixpath.join(config.db_index_url, pn_dir, file_name)
response = requests.get(url)

# Raise HTTPError if invalid url
response.raise_for_status()

# Get each line as a string
filelines = response.content.decode('iso-8859-1').splitlines()
if remote_dir.startswith('s3'):
# Set up the remote AWS S3 file system
import s3fs
fs = s3fs.S3FileSystem(anon=True)
file_dir = posixpath.join(remote_dir, file_name)
# Read and decode the lines
with fs.open(file_dir) as f:
filelines = f.readlines()
filelines = [l.decode('iso-8859-1') for l in filelines]
else:
# Full url of header location
url = posixpath.join(config.db_index_url, remote_dir, file_name)
response = requests.get(url)
# Raise HTTPError if invalid url
response.raise_for_status()
# Get each line as a string
filelines = response.content.decode('iso-8859-1').splitlines()

# Separate content into header and comment lines
header_lines = []
Expand All @@ -139,16 +165,22 @@ def _stream_header(file_name, pn_dir):
return (header_lines, comment_lines)


def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype):
def _stream_dat(file_name, remote_dir, byte_count, start_byte, dtype):
"""
Stream data from a remote dat file into a 1d numpy array.

Parameters
----------
file_name : str
The name of the dat file to be read.
pn_dir : str
The PhysioNet directory where the dat file is located.
remote_dir : str
The remote directory of either two things:
(1) The S3 URI form which to find the required header file. This
should always begin with 's3' in order to work correctly. An
example input would be: 's3://my-aws-bucket/'
(2) The PhysioNet database directory from which to find the
required header file. eg. For file '100.hea' in
'http://physionet.org/content/mitdb', remote_dir='mitdb'.
byte_count : int
The number of bytes to be read.
start_byte : int
Expand All @@ -162,58 +194,82 @@ def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype):
The data read from the dat file.

"""
# Full url of dat file
url = posixpath.join(config.db_index_url, pn_dir, file_name)
if remote_dir.startswith('s3'):
# Set up the remote AWS S3 file system
import s3fs
fs = s3fs.S3FileSystem(anon=True)
file_dir = posixpath.join(remote_dir, file_name)
# Read and decode the lines
with fs.open(file_dir) as f:
file_content = f.read()
else:
# Full url of dat file
url = posixpath.join(config.db_index_url, remote_dir, file_name)

# Specify the byte range
end_byte = start_byte + byte_count - 1
headers = {"Range":"bytes=%d-%d" % (start_byte, end_byte),
'Accept-Encoding': '*'}
# Specify the byte range
end_byte = start_byte + byte_count - 1
headers = {"Range":"bytes=%d-%d" % (start_byte, end_byte),
'Accept-Encoding': '*'}

# Get the content
response = requests.get(url, headers=headers, stream=True)
# Get the content
response = requests.get(url, headers=headers, stream=True)

# Raise HTTPError if invalid url
response.raise_for_status()
# Raise HTTPError if invalid url
response.raise_for_status()
file_content = response.content

# Convert to numpy array
if type(dtype) == str:
# Convert 24-bit to 16-bit then proceed
temp_data = np.frombuffer(response.content, 'b').reshape(-1,3)[:,1:].flatten().view('i2')
temp_data = np.frombuffer(file_content, 'b').reshape(-1,3)[:,1:].flatten().view('i2')
sig_data = np.fromstring(temp_data, dtype='i2')
else:
sig_data = np.fromstring(response.content, dtype=dtype)
sig_data = np.fromstring(file_content, dtype=dtype)

return sig_data


def _stream_annotation(file_name, pn_dir):
def _stream_annotation(file_name, remote_dir):
"""
Stream an entire remote annotation file from Physionet.

Parameters
----------
file_name : str
The name of the annotation file to be read.
pn_dir : str
The PhysioNet directory where the annotation file is located.
remote_dir : str
The remote directory of either two things:
(1) The S3 URI form which to find the required header file. This
should always begin with 's3' in order to work correctly. An
example input would be: 's3://my-aws-bucket/'
(2) The PhysioNet database directory from which to find the
required header file. eg. For file '100.hea' in
'http://physionet.org/content/mitdb', remote_dir='mitdb'.

Returns
-------
ann_data : ndarray
The resulting data stream in numpy array format.

"""
# Full url of annotation file
url = posixpath.join(config.db_index_url, pn_dir, file_name)

# Get the content
response = requests.get(url)
# Raise HTTPError if invalid url
response.raise_for_status()

if remote_dir.startswith('s3'):
# Set up the remote AWS S3 file system
import s3fs
fs = s3fs.S3FileSystem(anon=True)
file_dir = posixpath.join(remote_dir, file_name)
# Read and decode the lines
with fs.open(file_dir) as f:
file_content = f.read()
else:
# Full url of annotation file
url = posixpath.join(config.db_index_url, remote_dir, file_name)
# Get the content
response = requests.get(url)
# Raise HTTPError if invalid url
response.raise_for_status()
file_content = response.content
# Convert to numpy array
ann_data = np.fromstring(response.content, dtype=np.dtype('<u1'))
ann_data = np.fromstring(file_content, dtype=np.dtype('<u1'))

return ann_data

Expand Down
Loading