From 3b7ef8b1733458d8bffed9d60b9f7a22394796dd Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Fri, 18 Oct 2024 18:51:57 +0800 Subject: [PATCH] chore(query): reduce unsafe codes in kernels (#16633) * add tests * fix typo * remove unsafe codes --- Cargo.lock | 4 +- .../arrow/src/arrow/bitmap/immutable.rs | 6 +- src/common/arrow/src/arrow/bitmap/mutable.rs | 27 ++ src/query/expression/src/kernels/concat.rs | 162 ++------ src/query/expression/src/kernels/filter.rs | 378 +++++------------- src/query/expression/src/kernels/mod.rs | 2 + .../expression/src/kernels/take_ranges.rs | 82 +--- src/query/expression/tests/it/kernel.rs | 16 + 8 files changed, 191 insertions(+), 486 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4996a6f7083..88b8c0a4730a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15234,9 +15234,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.12" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", diff --git a/src/common/arrow/src/arrow/bitmap/immutable.rs b/src/common/arrow/src/arrow/bitmap/immutable.rs index 145a3a84a712..353efdc5adeb 100644 --- a/src/common/arrow/src/arrow/bitmap/immutable.rs +++ b/src/common/arrow/src/arrow/bitmap/immutable.rs @@ -269,10 +269,14 @@ impl Bitmap { /// Returns a pointer to the start of this [`Bitmap`] (ignores `offsets`) /// This pointer is allocated iff `self.len() > 0`. - pub(crate) fn offset(&self) -> usize { + pub fn offset(&self) -> usize { self.offset } + pub fn values(&self) -> &[u8] { + self.bytes.deref() + } + /// Converts this [`Bitmap`] to [`MutableBitmap`], returning itself if the conversion /// is not possible /// diff --git a/src/common/arrow/src/arrow/bitmap/mutable.rs b/src/common/arrow/src/arrow/bitmap/mutable.rs index 09182d81d27b..c0d0e058113c 100644 --- a/src/common/arrow/src/arrow/bitmap/mutable.rs +++ b/src/common/arrow/src/arrow/bitmap/mutable.rs @@ -15,6 +15,7 @@ use std::hint::unreachable_unchecked; use std::iter::FromIterator; +use std::ops::Range; use std::sync::Arc; use super::utils::count_zeros; @@ -202,6 +203,26 @@ impl MutableBitmap { } } + /// Append `range` bits from `to_set` + /// + /// `to_set` is a slice of bits packed LSB-first into `[u8]` + /// + /// # Panics + /// + /// Panics if `to_set` does not contain `ceil(range.end / 8)` bytes + pub fn append_packed_range(&mut self, range: Range, to_set: &[u8]) { + let offset_write = self.len(); + let len = range.end - range.start; + self.advance(len); + arrow_data::bit_mask::set_bits( + self.buffer.as_mut_slice(), + to_set, + offset_write, + range.start, + len, + ); + } + /// Initializes a zeroed [`MutableBitmap`]. #[inline] pub fn from_len_zeroed(length: usize) -> Self { @@ -227,6 +248,12 @@ impl MutableBitmap { .reserve((self.length + additional).saturating_add(7) / 8 - self.buffer.len()) } + /// Advances the buffer by `additional` bits + #[inline] + pub fn advance(&mut self, additional: usize) { + self.extend_unset(additional) + } + /// Returns the capacity of [`MutableBitmap`] in number of bits. #[inline] pub fn capacity(&self) -> usize { diff --git a/src/query/expression/src/kernels/concat.rs b/src/query/expression/src/kernels/concat.rs index 957fb93e6180..d327fff0eb2c 100644 --- a/src/query/expression/src/kernels/concat.rs +++ b/src/query/expression/src/kernels/concat.rs @@ -19,6 +19,7 @@ use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use ethnum::i256; use itertools::Itertools; use crate::copy_continuous_bits; @@ -28,6 +29,7 @@ use crate::kernels::utils::set_vec_len_by_ptr; use crate::store_advance_aligned; use crate::types::array::ArrayColumnBuilder; use crate::types::binary::BinaryColumn; +use crate::types::decimal::Decimal; use crate::types::decimal::DecimalColumn; use crate::types::geography::GeographyColumn; use crate::types::geometry::GeometryType; @@ -36,20 +38,20 @@ use crate::types::nullable::NullableColumn; use crate::types::number::NumberColumn; use crate::types::string::StringColumn; use crate::types::AnyType; -use crate::types::ArgType; use crate::types::ArrayType; use crate::types::BinaryType; use crate::types::BitmapType; use crate::types::BooleanType; +use crate::types::DateType; +use crate::types::DecimalType; use crate::types::GeographyType; use crate::types::MapType; use crate::types::NumberType; use crate::types::StringType; +use crate::types::TimestampType; use crate::types::ValueType; use crate::types::VariantType; -use crate::types::F32; -use crate::types::F64; -use crate::with_decimal_type; +use crate::with_decimal_mapped_type; use crate::with_number_mapped_type; use crate::BlockEntry; use crate::Column; @@ -139,121 +141,23 @@ impl Column { Column::EmptyArray { .. } => Column::EmptyArray { len: capacity }, Column::EmptyMap { .. } => Column::EmptyMap { len: capacity }, Column::Number(col) => with_number_mapped_type!(|NUM_TYPE| match col { - NumberColumn::UInt8(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_number().unwrap().into_u_int8().unwrap()), + NumberColumn::NUM_TYPE(_) => { + type NType = NumberType; + let buffer = Self::concat_primitive_types( + columns.map(|col| NType::try_downcast_column(&col).unwrap()), capacity, ); - >::upcast_column(>::column_from_vec(builder, &[])) - } - NumberColumn::UInt16(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_number().unwrap().into_u_int16().unwrap()), - capacity, - ); - >::upcast_column(>::column_from_vec( - builder, - &[], - )) - } - NumberColumn::UInt32(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_number().unwrap().into_u_int32().unwrap()), - capacity, - ); - >::upcast_column(>::column_from_vec( - builder, - &[], - )) - } - NumberColumn::UInt64(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_number().unwrap().into_u_int64().unwrap()), - capacity, - ); - >::upcast_column(>::column_from_vec( - builder, - &[], - )) - } - NumberColumn::Int8(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_number().unwrap().into_int8().unwrap()), - capacity, - ); - >::upcast_column(>::column_from_vec(builder, &[])) - } - NumberColumn::Int16(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_number().unwrap().into_int16().unwrap()), - capacity, - ); - >::upcast_column(>::column_from_vec( - builder, - &[], - )) - } - NumberColumn::Int32(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_number().unwrap().into_int32().unwrap()), - capacity, - ); - >::upcast_column(>::column_from_vec( - builder, - &[], - )) - } - NumberColumn::Int64(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_number().unwrap().into_int64().unwrap()), - capacity, - ); - >::upcast_column(>::column_from_vec( - builder, - &[], - )) - } - NumberColumn::Float32(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_number().unwrap().into_float32().unwrap()), - capacity, - ); - >::upcast_column(>::column_from_vec( - builder, - &[], - )) - } - NumberColumn::Float64(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_number().unwrap().into_float64().unwrap()), - capacity, - ); - >::upcast_column(>::column_from_vec( - builder, - &[], - )) + NType::upcast_column(buffer) } }), - Column::Decimal(col) => with_decimal_type!(|DECIMAL_TYPE| match col { - DecimalColumn::Decimal128(_, size) => { - let builder = Self::concat_primitive_types( - columns.map(|col| match col { - Column::Decimal(DecimalColumn::Decimal128(col, _)) => col, - _ => unreachable!(), - }), - capacity, - ); - Column::Decimal(DecimalColumn::Decimal128(builder.into(), size)) - } - DecimalColumn::Decimal256(_, size) => { - let builder = Self::concat_primitive_types( - columns.map(|col| match col { - Column::Decimal(DecimalColumn::Decimal256(col, _)) => col, - _ => unreachable!(), - }), + Column::Decimal(col) => with_decimal_mapped_type!(|DECIMAL_TYPE| match col { + DecimalColumn::DECIMAL_TYPE(_, size) => { + type DType = DecimalType; + let buffer = Self::concat_primitive_types( + columns.map(|col| DType::try_downcast_column(&col).unwrap()), capacity, ); - Column::Decimal(DecimalColumn::Decimal256(builder.into(), size)) + DECIMAL_TYPE::upcast_column(buffer, size) } }), Column::Boolean(_) => Column::Boolean(Self::concat_boolean_types( @@ -269,34 +173,18 @@ impl Column { capacity, )), Column::Timestamp(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_timestamp().unwrap()), + let buffer = Self::concat_primitive_types( + columns.map(|col| TimestampType::try_downcast_column(&col).unwrap()), capacity, ); - let ts = >::upcast_column(>::column_from_vec( - builder, - &[], - )) - .into_number() - .unwrap() - .into_int64() - .unwrap(); - Column::Timestamp(ts) + Column::Timestamp(buffer) } Column::Date(_) => { - let builder = Self::concat_primitive_types( - columns.map(|col| col.into_date().unwrap()), + let buffer = Self::concat_primitive_types( + columns.map(|col| DateType::try_downcast_column(&col).unwrap()), capacity, ); - let d = >::upcast_column(>::column_from_vec( - builder, - &[], - )) - .into_number() - .unwrap() - .into_int32() - .unwrap(); - Column::Date(d) + Column::Date(buffer) } Column::Array(col) => { let mut offsets = Vec::with_capacity(capacity + 1); @@ -372,7 +260,7 @@ impl Column { pub fn concat_primitive_types( cols: impl Iterator>, num_rows: usize, - ) -> Vec + ) -> Buffer where T: Copy, { @@ -380,7 +268,7 @@ impl Column { for col in cols { builder.extend(col.iter()); } - builder + builder.into() } pub fn concat_binary_types( diff --git a/src/query/expression/src/kernels/filter.rs b/src/query/expression/src/kernels/filter.rs index 1b730323a24f..894ceb412cfb 100644 --- a/src/query/expression/src/kernels/filter.rs +++ b/src/query/expression/src/kernels/filter.rs @@ -12,16 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use databend_common_arrow::arrow::bitmap::utils::BitChunkIterExact; use databend_common_arrow::arrow::bitmap::utils::BitChunksExact; +use databend_common_arrow::arrow::bitmap::utils::SlicesIterator; use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_arrow::arrow::bitmap::MutableBitmap; +use databend_common_arrow::arrow::bitmap::TrueIdxIter; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; -use crate::copy_continuous_bits; -use crate::kernels::take::BIT_MASK; use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; use crate::kernels::utils::store_advance_aligned; @@ -51,7 +50,6 @@ impl DataBlock { } let mut filter_visitor = FilterVisitor::new(bitmap); - let after_columns = self .columns() .iter() @@ -67,7 +65,7 @@ impl DataBlock { Ok(DataBlock::new_with_meta( after_columns, - self.num_rows() - count_zeros, + filter_visitor.filter_rows, self.get_meta().cloned(), )) } @@ -108,23 +106,65 @@ impl Column { } } -struct FilterVisitor<'a> { +/// The iteration strategy used to evaluate [`FilterVisitor`] +#[derive(Debug)] +pub enum IterationStrategy { + None, + All, + /// Range iterator + SlicesIterator, + /// True index iterator + IndexIterator, +} + +/// based on +const SELECTIVITY_THRESHOLD: f64 = 0.8; + +impl IterationStrategy { + fn default_strategy(length: usize, true_count: usize) -> Self { + if length == 0 || true_count == 0 { + return IterationStrategy::None; + } + if length == true_count { + return IterationStrategy::All; + } + let selectivity_frac = true_count as f64 / length as f64; + if selectivity_frac > SELECTIVITY_THRESHOLD { + return IterationStrategy::SlicesIterator; + } + IterationStrategy::IndexIterator + } +} + +pub struct FilterVisitor<'a> { filter: &'a Bitmap, result: Option>, - num_rows: usize, + filter_rows: usize, original_rows: usize, + strategy: IterationStrategy, } impl<'a> FilterVisitor<'a> { pub fn new(filter: &'a Bitmap) -> Self { - let num_rows = filter.len() - filter.unset_bits(); + let filter_rows = filter.len() - filter.unset_bits(); + let strategy = IterationStrategy::default_strategy(filter.len(), filter_rows); Self { filter, result: None, - num_rows, + filter_rows, original_rows: filter.len(), + strategy, } } + + pub fn with_strategy(mut self, strategy: IterationStrategy) -> Self { + self.strategy = strategy; + self + } + + pub fn take_result(&mut self) -> Option> { + self.result.take() + } } impl<'a> ValueVisitor for FilterVisitor<'a> { @@ -133,13 +173,12 @@ impl<'a> ValueVisitor for FilterVisitor<'a> { Value::Scalar(c) => self.visit_scalar(c), Value::Column(c) => { assert!(c.len() == self.original_rows); - - if c.len() == self.num_rows || c.len() == 0 { - self.result = Some(Value::Column(c)); - } else if self.num_rows == 0 { - self.result = Some(Value::Column(c.slice(0..0))); - } else { - self.visit_column(c)?; + match self.strategy { + IterationStrategy::None => self.result = Some(Value::Column(c.slice(0..0))), + IterationStrategy::All => self.result = Some(Value::Column(c)), + IterationStrategy::SlicesIterator | IterationStrategy::IndexIterator => { + self.visit_column(c)? + } } Ok(()) } @@ -171,54 +210,22 @@ impl<'a> ValueVisitor for FilterVisitor<'a> { let c = T::upcast_column(column.clone()); let builder = ColumnBuilder::with_capacity(&c.data_type(), c.len()); let mut builder = T::try_downcast_owned_builder(builder).unwrap(); - - const CHUNK_SIZE: usize = 64; - let (mut slice, offset, mut length) = self.filter.as_slice(); - let mut start_index: usize = 0; - if offset > 0 { - // If `offset` > 0, the valid bits of this byte start at `offset`, and the - // max num of valid bits is `8 - offset`, but we also need to ensure that - // we cannot iterate more than `length` bits. - let n = std::cmp::min(8 - offset, length); - start_index += n; - self.filter - .iter() - .enumerate() - .take(n) - .for_each(|(index, is_selected)| { - if is_selected { - T::push_item(&mut builder, T::index_column(&column, index).unwrap()); - } + match self.strategy { + IterationStrategy::IndexIterator => { + let iter = TrueIdxIter::new(self.original_rows, Some(self.filter)); + iter.for_each(|index| { + T::push_item(&mut builder, unsafe { + T::index_column_unchecked(&column, index) + }) }); - slice = &slice[1..]; - length = if length >= n { length - n } else { 0 }; + } + _ => { + let iter = SlicesIterator::new(self.filter); + iter.for_each(|(start, len)| { + T::append_column(&mut builder, &T::slice_column(&column, start..start + len)) + }); + } } - - let mut mask_chunks = BitChunksExact::::new(slice, length); - - mask_chunks - .by_ref() - .enumerate() - .for_each(|(mask_index, mut mask)| { - while mask != 0 { - let n = mask.trailing_zeros() as usize; - let index = mask_index * CHUNK_SIZE + n + start_index; - T::push_item(&mut builder, T::index_column(&column, index).unwrap()); - mask = mask & (mask - 1); - } - }); - - let remainder_start = length - length % CHUNK_SIZE; - mask_chunks - .remainder_iter() - .enumerate() - .for_each(|(mask_index, is_selected)| { - if is_selected { - let index = mask_index + remainder_start + start_index; - T::push_item(&mut builder, T::index_column(&column, index).unwrap()); - } - }); - self.result = Some(Value::Column(T::upcast_column(T::build_column(builder)))); Ok(()) } @@ -262,169 +269,29 @@ impl<'a> ValueVisitor for FilterVisitor<'a> { fn visit_boolean(&mut self, mut bitmap: Bitmap) -> Result<()> { // faster path for all bits set if bitmap.unset_bits() == 0 { - bitmap.slice(0, self.num_rows); + bitmap.slice(0, self.filter_rows); self.result = Some(Value::Column(BooleanType::upcast_column(bitmap))); return Ok(()); } - let capacity = self.num_rows.saturating_add(7) / 8; - let mut builder: Vec = Vec::with_capacity(capacity); - let mut builder_ptr = builder.as_mut_ptr(); - let mut builder_idx = 0; - let mut unset_bits = 0; - let mut buf = 0; - - let (bitmap_slice, bitmap_offset, _) = bitmap.as_slice(); - let mut bitmap_idx = 0; - - let (mut filter_slice, filter_offset, mut filter_length) = self.filter.as_slice(); - unsafe { - if filter_offset > 0 { - let mut mask = filter_slice[0]; - while mask != 0 { - let n = mask.trailing_zeros() as usize; - // If `filter_length` > 0, the valid bits of this byte start at `filter_offset`, we also - // need to ensure that we cannot iterate more than `filter_length` bits. - if n >= filter_offset && n < filter_offset + filter_length { - if bitmap.get_bit_unchecked(n - filter_offset) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - builder_idx += 1; - } - mask = mask & (mask - 1); - } - let bits_to_align = 8 - filter_offset; - filter_length = if filter_length >= bits_to_align { - filter_length - bits_to_align - } else { - 0 - }; - filter_slice = &filter_slice[1..]; - bitmap_idx += bits_to_align; - } - - const CHUNK_SIZE: usize = 64; - let mut mask_chunks = BitChunksExact::::new(filter_slice, filter_length); - let mut continuous_selected = 0; - for mut mask in mask_chunks.by_ref() { - if mask == u64::MAX { - continuous_selected += CHUNK_SIZE; - } else { - if continuous_selected > 0 { - if builder_idx % 8 != 0 { - while continuous_selected > 0 { - if bitmap.get_bit_unchecked(bitmap_idx) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - bitmap_idx += 1; - builder_idx += 1; - continuous_selected -= 1; - if builder_idx % 8 == 0 { - store_advance_aligned(buf, &mut builder_ptr); - buf = 0; - break; - } - } - } - - if continuous_selected > 0 { - let (cur_buf, cur_unset_bits) = copy_continuous_bits( - &mut builder_ptr, - bitmap_slice, - builder_idx, - bitmap_idx + bitmap_offset, - continuous_selected, - ); - builder_idx += continuous_selected; - bitmap_idx += continuous_selected; - unset_bits += cur_unset_bits; - buf = cur_buf; - continuous_selected = 0; - } - } - - while mask != 0 { - let n = mask.trailing_zeros() as usize; - if bitmap.get_bit_unchecked(bitmap_idx + n) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - builder_idx += 1; - if builder_idx % 8 == 0 { - store_advance_aligned(buf, &mut builder_ptr); - buf = 0; - } - mask = mask & (mask - 1); - } - bitmap_idx += CHUNK_SIZE; - } - } - - if continuous_selected > 0 { - if builder_idx % 8 != 0 { - while continuous_selected > 0 { - if bitmap.get_bit_unchecked(bitmap_idx) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - bitmap_idx += 1; - builder_idx += 1; - continuous_selected -= 1; - if builder_idx % 8 == 0 { - store_advance_aligned(buf, &mut builder_ptr); - buf = 0; - break; - } - } - } - - if continuous_selected > 0 { - let (cur_buf, cur_unset_bits) = copy_continuous_bits( - &mut builder_ptr, - bitmap_slice, - builder_idx, - bitmap_idx + bitmap_offset, - continuous_selected, - ); - builder_idx += continuous_selected; - bitmap_idx += continuous_selected; - unset_bits += cur_unset_bits; - buf = cur_buf; - } - } - - for (i, is_selected) in mask_chunks.remainder_iter().enumerate() { - if is_selected { - if bitmap.get_bit_unchecked(bitmap_idx + i) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - builder_idx += 1; - if builder_idx % 8 == 0 { - store_advance_aligned(buf, &mut builder_ptr); - buf = 0; - } - } + let bitmap = match self.strategy { + IterationStrategy::IndexIterator => { + let iter = TrueIdxIter::new(self.original_rows, Some(self.filter)); + MutableBitmap::from_trusted_len_iter(iter.map(|index| bitmap.get_bit(index))).into() } + _ => { + let src = bitmap.values(); + let offset = bitmap.offset(); - if builder_idx % 8 != 0 { - store_advance_aligned(buf, &mut builder_ptr); + let mut builder = MutableBitmap::with_capacity(self.filter_rows); + let iter = SlicesIterator::new(self.filter); + iter.for_each(|(start, len)| { + builder.append_packed_range(start + offset..start + len + offset, src) + }); + builder.into() } - } - - let bitmap = unsafe { - set_vec_len_by_ptr(&mut builder, builder_ptr); - Bitmap::from_inner(Arc::new(builder.into()), 0, self.num_rows, unset_bits) - .ok() - .unwrap() }; + self.result = Some(Value::Column(BooleanType::upcast_column(bitmap))); Ok(()) } @@ -454,78 +321,31 @@ impl<'a> ValueVisitor for FilterVisitor<'a> { impl<'a> FilterVisitor<'a> { fn filter_primitive_types(&mut self, buffer: Buffer) -> Buffer { - let mut builder: Vec = Vec::with_capacity(self.num_rows); - let mut ptr = builder.as_mut_ptr(); - let mut values_ptr = buffer.as_slice().as_ptr(); - let (mut slice, offset, mut length) = self.filter.as_slice(); - - unsafe { - if offset > 0 { - let mut mask = slice[0]; - while mask != 0 { - let n = mask.trailing_zeros() as usize; - // If `offset` > 0, the valid bits of this byte start at `offset`, we also - // need to ensure that we cannot iterate more than `length` bits. - if n >= offset && n < offset + length { - copy_advance_aligned(values_ptr.add(n - offset), &mut ptr, 1); - } - mask = mask & (mask - 1); - } - let bits_to_align = 8 - offset; - length = if length >= bits_to_align { - length - bits_to_align - } else { - 0 - }; - slice = &slice[1..]; - values_ptr = values_ptr.add(bits_to_align); + match self.strategy { + IterationStrategy::IndexIterator => { + let iter = TrueIdxIter::new(self.original_rows, Some(self.filter)); + Vec::from_iter(iter.map(|index| buffer[index])).into() } - - const CHUNK_SIZE: usize = 64; - let mut mask_chunks = BitChunksExact::::new(slice, length); - let mut continuous_selected = 0; - for mut mask in mask_chunks.by_ref() { - if mask == u64::MAX { - continuous_selected += CHUNK_SIZE; - } else { - if continuous_selected > 0 { - copy_advance_aligned(values_ptr, &mut ptr, continuous_selected); - values_ptr = values_ptr.add(continuous_selected); - continuous_selected = 0; - } - while mask != 0 { - let n = mask.trailing_zeros() as usize; - copy_advance_aligned(values_ptr.add(n), &mut ptr, 1); - mask = mask & (mask - 1); - } - values_ptr = values_ptr.add(CHUNK_SIZE); - } - } - if continuous_selected > 0 { - copy_advance_aligned(values_ptr, &mut ptr, continuous_selected); - values_ptr = values_ptr.add(continuous_selected); - } - - for (i, is_selected) in mask_chunks.remainder_iter().enumerate() { - if is_selected { - copy_advance_aligned(values_ptr.add(i), &mut ptr, 1); - } + _ => { + let mut builder = Vec::with_capacity(self.filter_rows); + let iter = SlicesIterator::new(self.filter); + iter.for_each(|(start, len)| { + builder.extend_from_slice(&buffer[start..start + len]); + }); + builder.into() } - - set_vec_len_by_ptr(&mut builder, ptr); } - - builder.into() } + // TODO: optimize this after BinaryView is introduced by @andy fn filter_binary_types(&mut self, values: &BinaryColumn) -> BinaryColumn { // Each element of `items` is (string pointer(u64), string length). - let mut items: Vec<(u64, usize)> = Vec::with_capacity(self.num_rows); + let mut items: Vec<(u64, usize)> = Vec::with_capacity(self.filter_rows); // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. let values_offset = values.offsets().as_slice(); let values_data_ptr = values.data().as_slice().as_ptr(); - let mut offsets: Vec = Vec::with_capacity(self.num_rows + 1); + let mut offsets: Vec = Vec::with_capacity(self.filter_rows + 1); let mut offsets_ptr = offsets.as_mut_ptr(); let mut items_ptr = items.as_mut_ptr(); let mut data_size = 0; diff --git a/src/query/expression/src/kernels/mod.rs b/src/query/expression/src/kernels/mod.rs index 1e5369e2d807..738598406d23 100644 --- a/src/query/expression/src/kernels/mod.rs +++ b/src/query/expression/src/kernels/mod.rs @@ -26,6 +26,8 @@ mod take_ranges; mod topk; mod utils; +pub use filter::FilterVisitor; +pub use filter::IterationStrategy; pub use group_by_hash::*; pub use sort::*; pub use sort_compare::*; diff --git a/src/query/expression/src/kernels/take_ranges.rs b/src/query/expression/src/kernels/take_ranges.rs index 71fbd0acc1af..3c9159abb44b 100644 --- a/src/query/expression/src/kernels/take_ranges.rs +++ b/src/query/expression/src/kernels/take_ranges.rs @@ -13,17 +13,12 @@ // limitations under the License. use core::ops::Range; -use std::sync::Arc; use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_arrow::arrow::bitmap::MutableBitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; -use crate::copy_continuous_bits; -use crate::kernels::take::BIT_MASK; -use crate::kernels::utils::copy_advance_aligned; -use crate::kernels::utils::set_vec_len_by_ptr; -use crate::kernels::utils::store_advance_aligned; use crate::types::binary::BinaryColumn; use crate::types::nullable::NullableColumn; use crate::types::string::StringColumn; @@ -159,60 +154,18 @@ impl<'a> ValueVisitor for TakeRangeVisitor<'a> { } fn visit_boolean(&mut self, bitmap: Bitmap) -> Result<()> { - let capacity = self.num_rows.saturating_add(7) / 8; - let mut builder: Vec = Vec::with_capacity(capacity); - let mut builder_ptr = builder.as_mut_ptr(); - let mut builder_idx = 0; - let mut unset_bits = 0; - let mut buf = 0; + let mut builder = MutableBitmap::with_capacity(self.num_rows); - let (bitmap_slice, bitmap_offset, _) = bitmap.as_slice(); - unsafe { - for range in self.ranges { - let mut start = range.start as usize; - let end = range.end as usize; - if builder_idx % 8 != 0 { - while start < end { - if bitmap.get_bit_unchecked(start) { - buf |= BIT_MASK[builder_idx % 8]; - } else { - unset_bits += 1; - } - builder_idx += 1; - start += 1; - if builder_idx % 8 == 0 { - store_advance_aligned(buf, &mut builder_ptr); - buf = 0; - break; - } - } - } - let remaining = end - start; - if remaining > 0 { - let (cur_buf, cur_unset_bits) = copy_continuous_bits( - &mut builder_ptr, - bitmap_slice, - builder_idx, - start + bitmap_offset, - remaining, - ); - builder_idx += remaining; - unset_bits += cur_unset_bits; - buf = cur_buf; - } - } - - if builder_idx % 8 != 0 { - store_advance_aligned(buf, &mut builder_ptr); - } + let src = bitmap.values(); + let offset = bitmap.offset(); + self.ranges.iter().for_each(|range| { + let start = range.start as usize; + let end = range.end as usize; + builder.append_packed_range(start + offset..end + offset, src) + }); - set_vec_len_by_ptr(&mut builder, builder_ptr); - let bitmap = Bitmap::from_inner(Arc::new(builder.into()), 0, self.num_rows, unset_bits) - .ok() - .unwrap(); - self.result = Some(Value::Column(BooleanType::upcast_column(bitmap))); - Ok(()) - } + self.result = Some(Value::Column(BooleanType::upcast_column(builder.into()))); + Ok(()) } fn visit_binary(&mut self, col: BinaryColumn) -> Result<()> { @@ -268,15 +221,10 @@ impl<'a> TakeRangeVisitor<'a> { // Build [`data`]. let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for range in self.ranges { - let col_data = &value_data[values_offset[range.start as usize] as usize - ..values_offset[range.end as usize] as usize]; - copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len()); - } - set_vec_len_by_ptr(&mut data, data_ptr); + for range in self.ranges { + let col_data = &value_data[values_offset[range.start as usize] as usize + ..values_offset[range.end as usize] as usize]; + data.extend_from_slice(col_data); } BinaryColumn::new(data.into(), offsets.into()) diff --git a/src/query/expression/tests/it/kernel.rs b/src/query/expression/tests/it/kernel.rs index 1a9e078d8d91..4d0d8a2af572 100644 --- a/src/query/expression/tests/it/kernel.rs +++ b/src/query/expression/tests/it/kernel.rs @@ -19,10 +19,13 @@ use databend_common_expression::types::number::*; use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; use databend_common_expression::types::StringType; +use databend_common_expression::visitor::ValueVisitor; use databend_common_expression::BlockEntry; use databend_common_expression::Column; use databend_common_expression::DataBlock; +use databend_common_expression::FilterVisitor; use databend_common_expression::FromData; +use databend_common_expression::IterationStrategy; use databend_common_expression::Scalar; use databend_common_expression::Value; use goldenfile::Mint; @@ -272,6 +275,19 @@ pub fn test_take_and_filter_and_concat() -> databend_common_exception::Result<() let random_block = rand_block_for_all_types(len); let random_block = random_block.slice(slice_start..slice_end); + { + // test filter + let mut f1 = + FilterVisitor::new(&filter).with_strategy(IterationStrategy::SlicesIterator); + let mut f2 = + FilterVisitor::new(&filter).with_strategy(IterationStrategy::IndexIterator); + for col in random_block.columns() { + f1.visit_value(col.value.clone())?; + f2.visit_value(col.value.clone())?; + assert_eq!(f1.take_result(), f2.take_result()); + } + } + filtered_blocks.push(random_block.clone().filter_with_bitmap(&filter)?); blocks.push(random_block);