Skip to content

Commit

Permalink
Merge branch 'main' into filter-2
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Oct 17, 2024
2 parents 8ca56e9 + 3c603da commit ab2e886
Show file tree
Hide file tree
Showing 55 changed files with 1,408 additions and 2,552 deletions.
2 changes: 1 addition & 1 deletion .github/actions/test_unit/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ runs:
RUST_TEST_THREADS: "8"
RUST_LOG: ERROR
RUST_MIN_STACK: 104857600
# RUST_BACKTRACE: full
# RUST_BACKTRACE: 1

- name: Upload failure
if: failure()
Expand Down
12 changes: 8 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 0 additions & 15 deletions src/common/base/src/base/watch_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ impl WatchNotify {
pub fn notify_waiters(&self) {
let _ = self.tx.send_replace(true);
}

pub fn notify_one(&self) {
self.notify_waiters()
}
}

#[cfg(test)]
Expand All @@ -72,15 +68,4 @@ mod tests {
let notified = notify.notified();
notified.await;
}

#[tokio::test]
async fn test_notify_one() {
let notify = WatchNotify::new();
// notify_waiters ahead of notified being instantiated and awaited
notify.notify_one();

// this should not await indefinitely
let notified = notify.notified();
notified.await;
}
}
1 change: 1 addition & 0 deletions src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"

[dev-dependencies]
goldenfile = "1.4"
maplit = "1.0.2"

[lints]
workspace = true
1 change: 1 addition & 0 deletions src/query/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod cluster_info;
pub mod database;
pub mod lock;
pub mod merge_into_join;
pub mod partition_columns;
pub mod plan;
pub mod query_kind;
pub mod runtime_filter_info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ mod pushdown_transform;
mod values_serde;

pub use pushdown_transform::get_pushdown_without_partition_columns;
pub use values_serde::get_partition_values;
pub use values_serde::str_to_scalar;
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

use std::collections::BTreeMap;

use databend_common_catalog::plan::Projection;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::FieldIndex;

use crate::plan::Projection;
use crate::plan::PushDownInfo;

pub fn get_pushdown_without_partition_columns(
mut pushdown: PushDownInfo,
partition_columns: &[FieldIndex],
Expand Down Expand Up @@ -87,10 +88,9 @@ fn shift_projection(prj: Projection, partition_columns: &[FieldIndex]) -> Result

#[cfg(test)]
mod tests {
use databend_common_catalog::plan::Projection;

use super::shift_projection;
use super::shift_projection_index;
use crate::plan::Projection;

#[test]
fn test_shift_projection_index() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::NumberScalar;
use databend_common_expression::Scalar;
use databend_common_expression::TableField;
use deltalake::kernel::Add;

pub fn str_to_scalar(value: &str, data_type: &DataType) -> Result<Scalar> {
if value.is_empty() {
Expand Down Expand Up @@ -81,20 +79,3 @@ pub fn str_to_scalar(value: &str, data_type: &DataType) -> Result<Scalar> {
))),
}
}

pub fn get_partition_values(add: &Add, fields: &[&TableField]) -> Result<Vec<Scalar>> {
let mut values = Vec::with_capacity(fields.len());
for f in fields {
match add.partition_values.get(&f.name) {
Some(Some(v)) => values.push(str_to_scalar(v, &f.data_type().into())?),
Some(None) => values.push(Scalar::Null),
None => {
return Err(ErrorCode::BadArguments(format!(
"partition value for column {} not found",
&f.name
)));
}
}
}
Ok(values)
}
6 changes: 1 addition & 5 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2940,11 +2940,7 @@ pub struct DiskCacheConfig {
#[serde(default, deny_unknown_fields)]
pub struct SpillConfig {
/// Path of spill to local disk. disable if it's empty.
#[clap(
long,
value_name = "VALUE",
default_value = "./.databend/temp/_query_spill"
)]
#[clap(long, value_name = "VALUE", default_value = "")]
pub spill_local_disk_path: OsString,

#[clap(long, value_name = "VALUE", default_value = "30")]
Expand Down
2 changes: 1 addition & 1 deletion src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ pub struct SpillConfig {
impl Default for SpillConfig {
fn default() -> Self {
Self {
path: OsString::from("./.databend/temp/_query_spill"),
path: OsString::from(""),
reserved_disk_ratio: OrderedFloat(0.3),
global_bytes_limit: u64::MAX,
}
Expand Down
167 changes: 167 additions & 0 deletions src/query/functions/src/aggregates/aggregate_mode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::hash::Hash;
use std::ops::AddAssign;
use std::sync::Arc;

use borsh::BorshDeserialize;
use borsh::BorshSerialize;
use databend_common_exception::Result;
use databend_common_expression::types::*;
use databend_common_expression::with_number_mapped_type;
use databend_common_expression::AggregateFunctionRef;
use databend_common_expression::Scalar;

use super::FunctionData;
use super::UnaryState;
use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription;
use crate::aggregates::assert_unary_arguments;
use crate::aggregates::AggregateUnaryFunction;

#[derive(BorshSerialize, BorshDeserialize)]
pub struct ModeState<T>
where
T: ValueType,
T::Scalar: Ord + Hash + BorshSerialize + BorshDeserialize,
{
pub frequency_map: HashMap<T::Scalar, u64>,
}

impl<T> Default for ModeState<T>
where
T: ValueType,
T::Scalar: Ord + Hash + BorshSerialize + BorshDeserialize,
{
fn default() -> Self {
ModeState::<T> {
frequency_map: HashMap::new(),
}
}
}

impl<T> UnaryState<T, T> for ModeState<T>
where
T: ValueType + Sync + Send,
T::Scalar: Ord + Hash + Sync + Send + BorshSerialize + BorshDeserialize,
{
fn add(
&mut self,
other: T::ScalarRef<'_>,
_function_data: Option<&dyn FunctionData>,
) -> Result<()> {
let other = T::to_owned_scalar(other);
match self.frequency_map.entry(other) {
Entry::Occupied(o) => *o.into_mut() += 1,
Entry::Vacant(v) => {
v.insert(1);
}
};

Ok(())
}

fn merge(&mut self, rhs: &Self) -> Result<()> {
for (key, value) in rhs.frequency_map.iter() {
match self.frequency_map.get_mut(key) {
Some(entry) => entry.add_assign(value),
None => {
self.frequency_map.insert(key.clone(), *value);
}
}
}

Ok(())
}

fn merge_result(
&mut self,
builder: &mut T::ColumnBuilder,
_function_data: Option<&dyn FunctionData>,
) -> Result<()> {
if self.frequency_map.is_empty() {
T::push_default(builder);
} else {
let (key, _) = self
.frequency_map
.iter()
.max_by_key(|&(_, value)| value)
.unwrap();
T::push_item(builder, T::to_scalar_ref(key));
}

Ok(())
}
}

pub fn try_create_aggregate_mode_function(
display_name: &str,
params: Vec<Scalar>,
arguments: Vec<DataType>,
) -> Result<AggregateFunctionRef> {
assert_unary_arguments(display_name, arguments.len())?;

let data_type = arguments[0].clone();
with_number_mapped_type!(|NUM| match &data_type {
DataType::Number(NumberDataType::NUM) => {
let func = AggregateUnaryFunction::<
ModeState<NumberType<NUM>>,
NumberType<NUM>,
NumberType<NUM>,
>::try_create(
display_name, data_type.clone(), params, data_type.clone()
)
.with_need_drop(true);
Ok(Arc::new(func))
}
DataType::Decimal(DecimalDataType::Decimal128(_)) => {
let func = AggregateUnaryFunction::<
ModeState<Decimal128Type>,
Decimal128Type,
Decimal128Type,
>::try_create(
display_name, data_type.clone(), params, data_type.clone()
)
.with_need_drop(true);
Ok(Arc::new(func))
}
DataType::Decimal(DecimalDataType::Decimal256(_)) => {
let func = AggregateUnaryFunction::<
ModeState<Decimal256Type>,
Decimal256Type,
Decimal256Type,
>::try_create(
display_name, data_type.clone(), params, data_type.clone()
)
.with_need_drop(true);
Ok(Arc::new(func))
}
_ => {
let func = AggregateUnaryFunction::<ModeState<AnyType>, AnyType, AnyType>::try_create(
display_name,
data_type.clone(),
params,
data_type.clone(),
)
.with_need_drop(true);
Ok(Arc::new(func))
}
})
}

pub fn aggregate_mode_function_desc() -> AggregateFunctionDescription {
AggregateFunctionDescription::creator(Box::new(try_create_aggregate_mode_function))
}
3 changes: 3 additions & 0 deletions src/query/functions/src/aggregates/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use super::aggregate_covariance::aggregate_covariance_sample_desc;
use super::aggregate_min_max_any::aggregate_any_function_desc;
use super::aggregate_min_max_any::aggregate_max_function_desc;
use super::aggregate_min_max_any::aggregate_min_function_desc;
use super::aggregate_mode::aggregate_mode_function_desc;
use super::aggregate_stddev::aggregate_stddev_pop_function_desc;
use super::aggregate_stddev::aggregate_stddev_samp_function_desc;
use super::aggregate_window_funnel::aggregate_window_funnel_function_desc;
Expand Down Expand Up @@ -141,6 +142,8 @@ impl Aggregators {
);

factory.register("histogram", aggregate_histogram_function_desc());

factory.register("mode", aggregate_mode_function_desc());
}

pub fn register_combinator(factory: &mut AggregateFunctionFactory) {
Expand Down
Loading

0 comments on commit ab2e886

Please sign in to comment.