Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): support json_object_insert function #16636

Merged
merged 8 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "57795da" }
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "c7525d9" }
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "672e423" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
orc-rust = { git = "https://github.com/datafuse-extras/datafusion-orc", rev = "03372b97" }
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" }
Expand Down
132 changes: 132 additions & 0 deletions src/query/functions/src/scalars/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,41 @@ pub fn register(registry: &mut FunctionRegistry) {
},
),
);

registry.register_function_factory("json_object_insert", |_, args_type| {
if args_type.len() != 3 && args_type.len() != 4 {
return None;
}
if (args_type[0].remove_nullable() != DataType::Variant && args_type[0] != DataType::Null)
|| (args_type[1].remove_nullable() != DataType::String
&& args_type[1] != DataType::Null)
{
return None;
}
if args_type.len() == 4
&& args_type[3].remove_nullable() != DataType::Boolean
&& args_type[3] != DataType::Null
{
return None;
}
let is_nullable = args_type[0].is_nullable_or_null();
let return_type = if is_nullable {
DataType::Nullable(Box::new(DataType::Variant))
} else {
DataType::Variant
};
Some(Arc::new(Function {
signature: FunctionSignature {
name: "json_object_insert".to_string(),
args_type: args_type.to_vec(),
return_type,
},
eval: FunctionEval::Scalar {
calc_domain: Box::new(|_, _| FunctionDomain::MayThrow),
eval: Box::new(move |args, ctx| json_object_insert_fn(args, ctx, is_nullable)),
},
}))
});
}

fn json_array_fn(args: &[ValueRef<AnyType>], ctx: &mut EvalContext) -> Value<AnyType> {
Expand Down Expand Up @@ -2017,6 +2052,103 @@ where
}
}

fn json_object_insert_fn(
args: &[ValueRef<AnyType>],
ctx: &mut EvalContext,
is_nullable: bool,
) -> Value<AnyType> {
let len_opt = args.iter().find_map(|arg| match arg {
ValueRef::Column(col) => Some(col.len()),
_ => None,
});
let len = len_opt.unwrap_or(1);
let mut validity = MutableBitmap::with_capacity(len);
let mut builder = BinaryColumnBuilder::with_capacity(len, len * 50);
for idx in 0..len {
let value = match &args[0] {
ValueRef::Scalar(scalar) => scalar.clone(),
ValueRef::Column(col) => unsafe { col.index_unchecked(idx) },
};
if value == ScalarRef::Null {
builder.commit_row();
validity.push(false);
continue;
}
let value = value.as_variant().unwrap();
if !is_object(value) {
ctx.set_error(builder.len(), "Invalid json object");
builder.commit_row();
validity.push(false);
continue;
}
let new_key = match &args[1] {
ValueRef::Scalar(scalar) => scalar.clone(),
ValueRef::Column(col) => unsafe { col.index_unchecked(idx) },
};
let new_val = match &args[2] {
ValueRef::Scalar(scalar) => scalar.clone(),
ValueRef::Column(col) => unsafe { col.index_unchecked(idx) },
};
if new_key == ScalarRef::Null || new_val == ScalarRef::Null {
builder.put(value);
builder.commit_row();
validity.push(true);
continue;
}
let update_flag = if args.len() == 4 {
let v = match &args[3] {
ValueRef::Scalar(scalar) => scalar.clone(),
ValueRef::Column(col) => unsafe { col.index_unchecked(idx) },
};
match v {
ScalarRef::Boolean(v) => v,
_ => false,
}
} else {
false
};
let new_key = new_key.as_string().unwrap();
let res = match new_val {
ScalarRef::Variant(new_val) => {
jsonb::object_insert(value, new_key, new_val, update_flag, &mut builder.data)
}
_ => {
// if the new value is not a json value, cast it to json.
let mut new_val_buf = vec![];
cast_scalar_to_variant(new_val.clone(), ctx.func_ctx.tz, &mut new_val_buf);
jsonb::object_insert(value, new_key, &new_val_buf, update_flag, &mut builder.data)
}
};
if let Err(err) = res {
validity.push(false);
ctx.set_error(builder.len(), err.to_string());
} else {
validity.push(true);
}
builder.commit_row();
}
if is_nullable {
let validity: Bitmap = validity.into();
match len_opt {
Some(_) => {
Value::Column(Column::Variant(builder.build())).wrap_nullable(Some(validity))
}
None => {
if !validity.get_bit(0) {
Value::Scalar(Scalar::Null)
} else {
Value::Scalar(Scalar::Variant(builder.build_scalar()))
}
}
}
} else {
match len_opt {
Some(_) => Value::Column(Column::Variant(builder.build())),
None => Value::Scalar(Scalar::Variant(builder.build_scalar())),
}
}
}

// Extract string for string type, other types convert to JSON string.
fn cast_to_string(v: &[u8]) -> String {
match to_str(v) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,7 @@ Functions overloads:
0 json_extract_path_text(String, String) :: String NULL
1 json_extract_path_text(String NULL, String NULL) :: String NULL
0 json_object FACTORY
0 json_object_insert FACTORY
0 json_object_keep_null FACTORY
0 json_object_keys(Variant NULL) :: Variant NULL
0 json_path_exists FACTORY
Expand Down
Loading
Loading