Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(config): pre-flight work for initial integration of configuration schema for sinks #13516

Merged
merged 17 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion benches/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Batch for PartitionedBuffer {
type Input = InnerBuffer;
type Output = InnerBuffer;

fn get_settings_defaults<D: SinkBatchSettings>(
fn get_settings_defaults<D: SinkBatchSettings + Clone>(
config: BatchConfig<D, Merged>,
) -> Result<BatchConfig<D, Merged>, BatchError> {
Buffer::get_settings_defaults(config)
Expand Down
6 changes: 4 additions & 2 deletions lib/codecs/src/encoding/format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::encoding::BuildError;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Encoder;
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};

/// Config used to build a `AvroSerializer`.
Expand Down Expand Up @@ -38,8 +39,9 @@ impl AvroSerializerConfig {
}
}

/// Options for building an `AvroSerializer`.
#[derive(Debug, Clone, Deserialize, Serialize)]
/// Apache Avro serializer options.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct AvroSerializerOptions {
/// The Avro schema.
pub schema: String,
Expand Down
8 changes: 5 additions & 3 deletions lib/codecs/src/encoding/framing/character_delimited.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Encoder;
use vector_config::configurable_component;

use super::BoxedFramingError;

Expand All @@ -25,10 +26,11 @@ impl CharacterDelimitedEncoderConfig {
}
}

/// Options for building a `CharacterDelimitedEncoder`.
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
/// Configuration for character-delimited framing.
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct CharacterDelimitedEncoderOptions {
/// The character that delimits byte sequences.
/// The ASCII (7-bit) character that delimits byte sequences.
#[serde(with = "vector_core::serde::ascii_char")]
pub delimiter: u8,
}
Expand Down
61 changes: 41 additions & 20 deletions lib/codecs/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use framing::{
CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder,
LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig,
};
use serde::{Deserialize, Serialize};
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};

/// An error that occurred while building an encoder.
Expand Down Expand Up @@ -51,23 +51,29 @@ impl From<std::io::Error> for Error {
}
}

/// Configuration for building a `Framer`.
/// Framing configuration.
// Unfortunately, copying options of the nested enum variants is necessary
// since `serde` doesn't allow `flatten`ing these:
// https://github.com/serde-rs/serde/issues/1402.
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
tobz marked this conversation as resolved.
Show resolved Hide resolved
#[serde(tag = "method", rename_all = "snake_case")]
pub enum FramingConfig {
/// Configures the `BytesEncoder`.
/// Event data is not delimited at all.
Bytes,
/// Configures the `CharacterDelimitedEncoder`.

/// Event data is delimited by a single ASCII (7-bit) character.
CharacterDelimited {
/// Options for the character delimited encoder.
character_delimited: CharacterDelimitedEncoderOptions,
},
/// Configures the `LengthDelimitedEncoder`.

/// Event data is prefixed with its length in bytes.
///
/// The prefix is a 32-bit unsigned integer, little endian.
LengthDelimited,
/// Configures the `NewlineDelimitedEncoder`.

/// Event data is delimited by a newline (LF) character.
NewlineDelimited,
}

Expand Down Expand Up @@ -180,30 +186,45 @@ impl tokio_util::codec::Encoder<()> for Framer {
}

/// Configuration for building a `Serializer`.
// Unfortunately, copying options of the nested enum variants is necessary
// since `serde` doesn't allow `flatten`ing these:
// https://github.com/serde-rs/serde/issues/1402.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "codec", rename_all = "snake_case")]
pub enum SerializerConfig {
/// Configures the `AvroSerializer`.
/// Apache Avro serialization.
Avro {
/// Options for the avro serializer.
/// Apache Avro serializer options.
avro: AvroSerializerOptions,
},
/// Configures the `GelfSerializer`.

/// GELF serialization.
Gelf,
/// Configures the `JsonSerializer`.

/// JSON serialization.
Json,
/// Configures the `LogfmtSerializer`.

/// Logfmt serialization.
Logfmt,
/// Configures the `NativeSerializer`.

/// Native Vector serialization based on Protocol Buffers.
Native,
/// Configures the `NativeJsonSerializer`.

/// Native Vector serialization based on JSON.
NativeJson,
/// Configures the `RawMessageSerializer`.

/// No serialization.
///
/// This encoding, specifically, will only encode the `message` field of a log event. Users should take care if
/// they're modifying their log events (such as by using a `remap` transform, etc) and removing the message field
/// while doing additional parsing on it, as this could lead to the encoding emitting empty strings for the given
/// event.
RawMessage,
/// Configures the `TextSerializer`.

/// Plaintext serialization.
///
/// This encoding, specifically, will only encode the `message` field of a log event. Users should take care if
/// they're modifying their log events (such as by using a `remap` transform, etc) and removing the message field
/// while doing additional parsing on it, as this could lead to the encoding emitting empty strings for the given
/// event.
Text,
}

Expand Down
2 changes: 2 additions & 0 deletions lib/lookup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ inherent = "1.0"
lalrpop-util = { version = "0.19.8", features = ["lexer"] }
once_cell = { version = "1.13" }
quickcheck = { version = "1.0.3", optional = true }
vector_config = { path = "../vector-config" }
vector_config_macros = { path = "../vector-config-macros" }

[dev-dependencies]
criterion = { version = "0.3.6", features = ["html_reports", "async_tokio"] }
Expand Down
33 changes: 14 additions & 19 deletions lib/lookup/src/lookup_v2/owned.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::lookup_v2::{parse_path, BorrowedSegment, Path};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use vector_config::configurable_component;

/// A lookup path.
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[serde(from = "String", into = "String")]
pub struct OwnedPath {
pub segments: Vec<OwnedSegment>,
}
Expand Down Expand Up @@ -36,27 +39,20 @@ impl OwnedPath {
}
}

impl<'de> Deserialize<'de> for OwnedPath {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let path: String = Deserialize::deserialize(deserializer)?;
Ok(parse_path(&path))
impl From<String> for OwnedPath {
fn from(raw_path: String) -> Self {
parse_path(raw_path.as_str())
}
}

impl Serialize for OwnedPath {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if self.segments.is_empty() {
serializer.serialize_str("<invalid>")
impl From<OwnedPath> for String {
fn from(owned: OwnedPath) -> Self {
if owned.segments.is_empty() {
String::from("<invalid>")
} else {
let mut coalesce_i = 0;

let path = self
owned
.segments
.iter()
.enumerate()
Expand Down Expand Up @@ -93,8 +89,7 @@ impl Serialize for OwnedPath {
}
})
.collect::<Vec<_>>()
.join("");
serializer.serialize_str(&path)
.join("")
}
}
}
Expand Down Expand Up @@ -146,7 +141,7 @@ impl<'a, const N: usize> From<[BorrowedSegment<'a>; N]> for OwnedPath {
}
}

#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum OwnedSegment {
Field(String),
Index(isize),
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-common/src/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub mod ser_de {
}
}

impl<'de> Configurable<'de> for TimeZone {
impl Configurable for TimeZone {
fn referencable_name() -> Option<&'static str> {
Some("vector_common::TimeZone")
}
Expand All @@ -97,7 +97,7 @@ impl<'de> Configurable<'de> for TimeZone {
Some("Strongly-typed list of timezones as defined in the `tz` database.")
}

fn generate_schema(gen: &mut SchemaGenerator, overrides: Metadata<'de, Self>) -> SchemaObject {
fn generate_schema(gen: &mut SchemaGenerator, overrides: Metadata<Self>) -> SchemaObject {
let mut schema = generate_string_schema();
finalize_schema(gen, &mut schema, overrides);
schema
Expand Down
44 changes: 43 additions & 1 deletion lib/vector-config-macros/src/ast/container.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashSet;

use darling::{error::Accumulator, util::Flag, FromAttributes};
use serde_derive_internals::{ast as serde_ast, Ctxt, Derive};
use syn::{DeriveInput, ExprPath, Generics, Ident, Type};
use syn::{DeriveInput, ExprPath, Generics, Ident, Type, TypeParam};
use vector_config_common::attributes::CustomAttribute;

use super::{
Expand Down Expand Up @@ -264,6 +266,46 @@ impl<'a> Container<'a> {
.into_iter()
.flat_map(|metadata| metadata.attributes())
}

/// Gets the generic types that are used within fields or variants that are part of the schemas.
///
/// In order to ensure we can allow for a maximally flexible `Configurable` trait, we add bounds to generic types that are
/// present on derived containers so that bounds don't need to be added on the actual container itself, essentially
/// avoiding declarations like `pub struct Foo<T> where T: Configurable {...}`.
///
/// We contain this logic here as we only care about generic type parameters that are present on fields that will be
/// included in the schema, so skipped fields shouldn't have bounds added, and so on.
pub fn generic_field_types(&self) -> Vec<TypeParam> {
let mut generic_types = Vec::new();

let field_types = match &self.data {
Data::Struct(_, fields) => fields
.iter()
.filter(|f| f.visible())
.filter_map(|f| match f.ty() {
Type::Path(tp) => tp.path.get_ident().cloned(),
_ => None,
})
.collect::<HashSet<_>>(),
Data::Enum(variants) => variants
.iter()
.filter(|v| v.visible())
.flat_map(|v| v.fields().iter())
.filter_map(|f| match f.ty() {
Type::Path(tp) => tp.path.get_ident().cloned(),
_ => None,
})
.collect::<HashSet<_>>(),
};

for type_param in self.original.generics.type_params() {
if field_types.contains(&type_param.ident) {
generic_types.push(type_param.clone());
}
}

generic_types
}
}

#[derive(Debug, Default, FromAttributes)]
Expand Down
Loading