Skip to content

Commit

Permalink
Merge branch 'main' into add_mode
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Oct 17, 2024
2 parents cb4d3a0 + efa1e96 commit 6864bf0
Show file tree
Hide file tree
Showing 49 changed files with 1,380 additions and 2,764 deletions.
2 changes: 1 addition & 1 deletion .github/actions/test_unit/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ runs:
RUST_TEST_THREADS: "8"
RUST_LOG: ERROR
RUST_MIN_STACK: 104857600
# RUST_BACKTRACE: full
# RUST_BACKTRACE: 1

- name: Upload failure
if: failure()
Expand Down
12 changes: 8 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"

[dev-dependencies]
goldenfile = "1.4"
maplit = "1.0.2"

[lints]
workspace = true
1 change: 1 addition & 0 deletions src/query/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod cluster_info;
pub mod database;
pub mod lock;
pub mod merge_into_join;
pub mod partition_columns;
pub mod plan;
pub mod query_kind;
pub mod runtime_filter_info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ mod pushdown_transform;
mod values_serde;

pub use pushdown_transform::get_pushdown_without_partition_columns;
pub use values_serde::get_partition_values;
pub use values_serde::str_to_scalar;
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

use std::collections::BTreeMap;

use databend_common_catalog::plan::Projection;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::FieldIndex;

use crate::plan::Projection;
use crate::plan::PushDownInfo;

pub fn get_pushdown_without_partition_columns(
mut pushdown: PushDownInfo,
partition_columns: &[FieldIndex],
Expand Down Expand Up @@ -87,10 +88,9 @@ fn shift_projection(prj: Projection, partition_columns: &[FieldIndex]) -> Result

#[cfg(test)]
mod tests {
use databend_common_catalog::plan::Projection;

use super::shift_projection;
use super::shift_projection_index;
use crate::plan::Projection;

#[test]
fn test_shift_projection_index() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::NumberScalar;
use databend_common_expression::Scalar;
use databend_common_expression::TableField;
use deltalake::kernel::Add;

pub fn str_to_scalar(value: &str, data_type: &DataType) -> Result<Scalar> {
if value.is_empty() {
Expand Down Expand Up @@ -81,20 +79,3 @@ pub fn str_to_scalar(value: &str, data_type: &DataType) -> Result<Scalar> {
))),
}
}

pub fn get_partition_values(add: &Add, fields: &[&TableField]) -> Result<Vec<Scalar>> {
let mut values = Vec::with_capacity(fields.len());
for f in fields {
match add.partition_values.get(&f.name) {
Some(Some(v)) => values.push(str_to_scalar(v, &f.data_type().into())?),
Some(None) => values.push(Scalar::Null),
None => {
return Err(ErrorCode::BadArguments(format!(
"partition value for column {} not found",
&f.name
)));
}
}
}
Ok(values)
}
6 changes: 1 addition & 5 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2940,11 +2940,7 @@ pub struct DiskCacheConfig {
#[serde(default, deny_unknown_fields)]
pub struct SpillConfig {
/// Path of spill to local disk. disable if it's empty.
#[clap(
long,
value_name = "VALUE",
default_value = "./.databend/temp/_query_spill"
)]
#[clap(long, value_name = "VALUE", default_value = "")]
pub spill_local_disk_path: OsString,

#[clap(long, value_name = "VALUE", default_value = "30")]
Expand Down
2 changes: 1 addition & 1 deletion src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ pub struct SpillConfig {
impl Default for SpillConfig {
fn default() -> Self {
Self {
path: OsString::from("./.databend/temp/_query_spill"),
path: OsString::from(""),
reserved_disk_ratio: OrderedFloat(0.3),
global_bytes_limit: u64::MAX,
}
Expand Down
127 changes: 124 additions & 3 deletions src/query/service/src/locks/lock_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@ use std::time::Instant;

use backoff::backoff::Backoff;
use databend_common_base::base::tokio::time::sleep;
use databend_common_base::base::tokio::time::timeout;
use databend_common_base::base::WatchNotify;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::catalog::Catalog;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_api::kv_pb_api::KVPbApi;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::DeleteLockRevReq;
use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::ListLockRevReq;
use databend_common_meta_app::schema::TableLockIdent;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::protobuf::watch_request::FilterType;
use databend_common_meta_types::protobuf::WatchRequest;
use databend_common_metrics::lock::record_acquired_lock_nums;
use databend_common_metrics::lock::record_created_lock_nums;
use databend_common_storages_fuse::operations::set_backoff;
use databend_common_users::UserApiProvider;
use futures::future::select;
use futures::future::Either;
use futures_util::StreamExt;
use rand::thread_rng;
use rand::Rng;

Expand All @@ -46,13 +56,120 @@ pub struct LockHolder {

impl LockHolder {
#[async_backtrace::framed]
pub async fn start(
pub(crate) async fn try_acquire_lock(
self: &Arc<Self>,
catalog: Arc<dyn Catalog>,
req: CreateLockRevReq,
should_retry: bool,
acquire_timeout: Duration,
) -> Result<u64> {
let start = Instant::now();

let ttl = req.ttl;

let lock_key = req.lock_key.clone();
let lock_type = lock_key.lock_type().to_string();
let table_id = lock_key.get_table_id();
let tenant = lock_key.get_tenant();

let revision = self.start(catalog.clone(), req).await?;

let meta_api = UserApiProvider::instance().get_meta_store_client();
let list_table_lock_req = ListLockRevReq::new(lock_key.clone());

loop {
// List all revisions and check if the current is the minimum.
let mut rev_list = catalog
.list_lock_revisions(list_table_lock_req.clone())
.await?
.into_iter()
.map(|(x, _)| x)
.collect::<Vec<_>>();
// list_lock_revisions are returned in big-endian order,
// we need to sort them in ascending numeric order.
rev_list.sort();
let position = rev_list.iter().position(|x| *x == revision).ok_or_else(||
// If the current is not found in list, it means that the current has been expired.
ErrorCode::TableLockExpired(format!(
"The acquired table lock with revision '{}' maybe expired(elapsed: {:?})",
revision,
start.elapsed(),
)))?;

if position == 0 {
// The lock is acquired by current session.
let extend_table_lock_req =
ExtendLockRevReq::new(lock_key.clone(), revision, ttl, true);

catalog.extend_lock_revision(extend_table_lock_req).await?;
// metrics.
record_acquired_lock_nums(lock_type, table_id, 1);
break;
}

let prev_revision = rev_list[position - 1];
let elapsed = start.elapsed();
// if no need retry, return error directly.
if !should_retry || elapsed >= acquire_timeout {
return Err(ErrorCode::TableAlreadyLocked(format!(
"Table is locked by other session(rev: {}, prev: {}, elapsed: {:?})",
revision,
prev_revision,
start.elapsed()
)));
}

let watch_delete_ident = TableLockIdent::new(tenant, table_id, prev_revision);

// Get the previous revision, watch the delete event.
let req = WatchRequest {
key: watch_delete_ident.to_string_key(),
key_end: None,
filter_type: FilterType::Delete.into(),
};
let mut watch_stream = meta_api.watch(req).await?;

let lock_meta = meta_api.get_pb(&watch_delete_ident).await?;
if lock_meta.is_none() {
log::warn!(
"Lock revision '{}' already does not exist, skipping",
prev_revision
);
continue;
}

// Add a timeout period for watch.
if let Err(_cause) = timeout(acquire_timeout.abs_diff(elapsed), async move {
while let Some(Ok(resp)) = watch_stream.next().await {
if let Some(event) = resp.event {
if event.current.is_none() {
break;
}
}
}
})
.await
{
return Err(ErrorCode::TableAlreadyLocked(format!(
"Table is locked by other session(rev: {}, prev: {}, elapsed: {:?})",
revision,
prev_revision,
start.elapsed()
)));
}
}

Ok(revision)
}

#[async_backtrace::framed]
async fn start(
self: &Arc<Self>,
query_id: String,
catalog: Arc<dyn Catalog>,
req: CreateLockRevReq,
) -> Result<u64> {
let lock_key = req.lock_key.clone();
let query_id = req.query_id.clone();
let ttl = req.ttl;
let sleep_range = (ttl / 3)..=(ttl * 2 / 3);

Expand All @@ -61,6 +178,7 @@ impl LockHolder {
let revision = res.revision;
// metrics.
record_created_lock_nums(lock_key.lock_type().to_string(), lock_key.get_table_id(), 1);
log::debug!("create table lock success, revision={}", revision);

let delete_table_lock_req = DeleteLockRevReq::new(lock_key.clone(), revision);
let extend_table_lock_req = ExtendLockRevReq::new(lock_key.clone(), revision, ttl, false);
Expand Down Expand Up @@ -179,7 +297,10 @@ impl LockHolder {
let mut backoff = set_backoff(Some(Duration::from_millis(2)), None, max_retry_elapsed);
loop {
match catalog.delete_lock_revision(req.clone()).await {
Ok(_) => break,
Ok(_) => {
log::debug!("delete table lock success, revision={}", req.revision);
break;
}
Err(e) => match backoff.next_backoff() {
Some(duration) => {
log::debug!(
Expand Down
Loading

0 comments on commit 6864bf0

Please sign in to comment.