diff --git a/tests/test_record.py b/tests/test_record.py index 8d09e39d..3459897b 100644 --- a/tests/test_record.py +++ b/tests/test_record.py @@ -1053,19 +1053,20 @@ def test_physical_conversion(self): adc_gain = [1.0, 1234.567, 765.4321] baseline = [10, 20, -30] d_signal = np.repeat(np.arange(-100, 100), 3).reshape(-1, 3) + d_signal[5:10, :] = [-32768, -2048, -128] e_d_signal = list(d_signal.transpose()) - fmt = ["16", "16", "16"] + fmt = ["16", "212", "80"] # Test adding or subtracting a small offset (0.01 ADU) to check # that we correctly round to the nearest integer for offset in (0, -0.01, 0.01): p_signal = (d_signal + offset - baseline) / adc_gain + p_signal[5:10, :] = np.nan e_p_signal = list(p_signal.transpose()) # Test converting p_signal to d_signal record = wfdb.Record( - n_sig=n_sig, p_signal=p_signal.copy(), adc_gain=adc_gain, baseline=baseline, @@ -1081,7 +1082,6 @@ def test_physical_conversion(self): # Test converting e_p_signal to e_d_signal record = wfdb.Record( - n_sig=n_sig, e_p_signal=[s.copy() for s in e_p_signal], adc_gain=adc_gain, baseline=baseline, @@ -1108,7 +1108,7 @@ def test_physical_conversion(self): p_signal=p_signal, adc_gain=adc_gain, baseline=baseline, - fmt=["16", "16", "16"], + fmt=fmt, write_dir=self.temp_path, ) record = wfdb.rdrecord( diff --git a/wfdb/io/_signal.py b/wfdb/io/_signal.py index a4ffbced..68ca57e4 100644 --- a/wfdb/io/_signal.py +++ b/wfdb/io/_signal.py @@ -532,68 +532,60 @@ def adc(self, expanded=False, inplace=False): # To do: choose the minimum return res needed intdtype = "int64" + # Convert a physical (1D or 2D) signal array to digital. Note that + # the input array is modified! + def adc_inplace(p_signal, adc_gain, baseline, d_nan): + nanlocs = np.isnan(p_signal) + np.multiply(p_signal, adc_gain, p_signal) + np.add(p_signal, baseline, p_signal) + np.round(p_signal, 0, p_signal) + np.copyto(p_signal, d_nan, where=nanlocs) + d_signal = p_signal.astype(intdtype, copy=False) + return d_signal + # Do inplace conversion and set relevant variables. if inplace: if expanded: - for ch in range(self.n_sig): - # NAN locations for the channel - ch_nanlocs = np.isnan(self.e_p_signal[ch]) - np.multiply( - self.e_p_signal[ch], + for ch, ch_p_signal in enumerate(self.e_p_signal): + ch_d_signal = adc_inplace( + ch_p_signal, self.adc_gain[ch], - self.e_p_signal[ch], - ) - np.add( - self.e_p_signal[ch], self.baseline[ch], - self.e_p_signal[ch], - ) - np.round(self.e_p_signal[ch], 0, self.e_p_signal[ch]) - self.e_p_signal[ch] = self.e_p_signal[ch].astype( - intdtype, copy=False + d_nans[ch], ) - self.e_p_signal[ch][ch_nanlocs] = d_nans[ch] + self.e_p_signal[ch] = ch_d_signal self.e_d_signal = self.e_p_signal self.e_p_signal = None else: - nanlocs = np.isnan(self.p_signal) - np.multiply(self.p_signal, self.adc_gain, self.p_signal) - np.add(self.p_signal, self.baseline, self.p_signal) - np.round(self.p_signal, 0, self.p_signal) - self.p_signal = self.p_signal.astype(intdtype, copy=False) - self.d_signal = self.p_signal + self.d_signal = adc_inplace( + self.p_signal, + self.adc_gain, + self.baseline, + d_nans, + ) self.p_signal = None # Return the variable else: if expanded: - d_signal = [] - for ch in range(self.n_sig): - # NAN locations for the channel - ch_nanlocs = np.isnan(self.e_p_signal[ch]) - ch_d_signal = self.e_p_signal[ch].copy() - np.multiply(ch_d_signal, self.adc_gain[ch], ch_d_signal) - np.add(ch_d_signal, self.baseline[ch], ch_d_signal) - np.round(ch_d_signal, 0, ch_d_signal) - ch_d_signal = ch_d_signal.astype(intdtype, copy=False) - ch_d_signal[ch_nanlocs] = d_nans[ch] - d_signal.append(ch_d_signal) + e_d_signal = [] + for ch, ch_p_signal in enumerate(self.e_p_signal): + ch_d_signal = adc_inplace( + ch_p_signal.copy(), + self.adc_gain[ch], + self.baseline[ch], + d_nans[ch], + ) + e_d_signal.append(ch_d_signal) + return e_d_signal else: - nanlocs = np.isnan(self.p_signal) - # Cannot cast dtype to int now because gain is float. - d_signal = self.p_signal.copy() - np.multiply(d_signal, self.adc_gain, d_signal) - np.add(d_signal, self.baseline, d_signal) - np.round(d_signal, 0, d_signal) - d_signal = d_signal.astype(intdtype, copy=False) - - if nanlocs.any(): - for ch in range(d_signal.shape[1]): - if nanlocs[:, ch].any(): - d_signal[nanlocs[:, ch], ch] = d_nans[ch] - - return d_signal + return adc_inplace( + self.p_signal.copy(), + self.adc_gain, + self.baseline, + d_nans, + ) def dac(self, expanded=False, return_res=64, inplace=False): """