Skip to content

Commit

Permalink
chore(config): pre-flight work for initial integration of configurati…
Browse files Browse the repository at this point in the history
…on schema for sinks (#13516)
  • Loading branch information
tobz authored Jul 18, 2022
1 parent 000fe8b commit c640add
Show file tree
Hide file tree
Showing 33 changed files with 1,014 additions and 541 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion benches/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Batch for PartitionedBuffer {
type Input = InnerBuffer;
type Output = InnerBuffer;

fn get_settings_defaults<D: SinkBatchSettings>(
fn get_settings_defaults<D: SinkBatchSettings + Clone>(
config: BatchConfig<D, Merged>,
) -> Result<BatchConfig<D, Merged>, BatchError> {
Buffer::get_settings_defaults(config)
Expand Down
6 changes: 4 additions & 2 deletions lib/codecs/src/encoding/format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::encoding::BuildError;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Encoder;
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};

/// Config used to build a `AvroSerializer`.
Expand Down Expand Up @@ -38,8 +39,9 @@ impl AvroSerializerConfig {
}
}

/// Options for building an `AvroSerializer`.
#[derive(Debug, Clone, Deserialize, Serialize)]
/// Apache Avro serializer options.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct AvroSerializerOptions {
/// The Avro schema.
pub schema: String,
Expand Down
8 changes: 5 additions & 3 deletions lib/codecs/src/encoding/framing/character_delimited.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Encoder;
use vector_config::configurable_component;

use super::BoxedFramingError;

Expand All @@ -25,10 +26,11 @@ impl CharacterDelimitedEncoderConfig {
}
}

/// Options for building a `CharacterDelimitedEncoder`.
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
/// Configuration for character-delimited framing.
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct CharacterDelimitedEncoderOptions {
/// The character that delimits byte sequences.
/// The ASCII (7-bit) character that delimits byte sequences.
#[serde(with = "vector_core::serde::ascii_char")]
pub delimiter: u8,
}
Expand Down
61 changes: 41 additions & 20 deletions lib/codecs/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use framing::{
CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder,
LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig,
};
use serde::{Deserialize, Serialize};
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};

/// An error that occurred while building an encoder.
Expand Down Expand Up @@ -51,23 +51,29 @@ impl From<std::io::Error> for Error {
}
}

/// Configuration for building a `Framer`.
/// Framing configuration.
// Unfortunately, copying options of the nested enum variants is necessary
// since `serde` doesn't allow `flatten`ing these:
// https://github.com/serde-rs/serde/issues/1402.
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
#[serde(tag = "method", rename_all = "snake_case")]
pub enum FramingConfig {
/// Configures the `BytesEncoder`.
/// Event data is not delimited at all.
Bytes,
/// Configures the `CharacterDelimitedEncoder`.

/// Event data is delimited by a single ASCII (7-bit) character.
CharacterDelimited {
/// Options for the character delimited encoder.
character_delimited: CharacterDelimitedEncoderOptions,
},
/// Configures the `LengthDelimitedEncoder`.

/// Event data is prefixed with its length in bytes.
///
/// The prefix is a 32-bit unsigned integer, little endian.
LengthDelimited,
/// Configures the `NewlineDelimitedEncoder`.

/// Event data is delimited by a newline (LF) character.
NewlineDelimited,
}

Expand Down Expand Up @@ -180,30 +186,45 @@ impl tokio_util::codec::Encoder<()> for Framer {
}

/// Configuration for building a `Serializer`.
// Unfortunately, copying options of the nested enum variants is necessary
// since `serde` doesn't allow `flatten`ing these:
// https://github.com/serde-rs/serde/issues/1402.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "codec", rename_all = "snake_case")]
pub enum SerializerConfig {
/// Configures the `AvroSerializer`.
/// Apache Avro serialization.
Avro {
/// Options for the avro serializer.
/// Apache Avro serializer options.
avro: AvroSerializerOptions,
},
/// Configures the `GelfSerializer`.

/// GELF serialization.
Gelf,
/// Configures the `JsonSerializer`.

/// JSON serialization.
Json,
/// Configures the `LogfmtSerializer`.

/// Logfmt serialization.
Logfmt,
/// Configures the `NativeSerializer`.

/// Native Vector serialization based on Protocol Buffers.
Native,
/// Configures the `NativeJsonSerializer`.

/// Native Vector serialization based on JSON.
NativeJson,
/// Configures the `RawMessageSerializer`.

/// No serialization.
///
/// This encoding, specifically, will only encode the `message` field of a log event. Users should take care if
/// they're modifying their log events (such as by using a `remap` transform, etc) and removing the message field
/// while doing additional parsing on it, as this could lead to the encoding emitting empty strings for the given
/// event.
RawMessage,
/// Configures the `TextSerializer`.

/// Plaintext serialization.
///
/// This encoding, specifically, will only encode the `message` field of a log event. Users should take care if
/// they're modifying their log events (such as by using a `remap` transform, etc) and removing the message field
/// while doing additional parsing on it, as this could lead to the encoding emitting empty strings for the given
/// event.
Text,
}

Expand Down
2 changes: 2 additions & 0 deletions lib/lookup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ inherent = "1.0"
lalrpop-util = { version = "0.19.8", features = ["lexer"] }
once_cell = { version = "1.13" }
quickcheck = { version = "1.0.3", optional = true }
vector_config = { path = "../vector-config" }
vector_config_macros = { path = "../vector-config-macros" }

[dev-dependencies]
criterion = { version = "0.3.6", features = ["html_reports", "async_tokio"] }
Expand Down
33 changes: 14 additions & 19 deletions lib/lookup/src/lookup_v2/owned.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::lookup_v2::{parse_path, BorrowedSegment, Path};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use vector_config::configurable_component;

/// A lookup path.
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[serde(from = "String", into = "String")]
pub struct OwnedPath {
pub segments: Vec<OwnedSegment>,
}
Expand Down Expand Up @@ -36,27 +39,20 @@ impl OwnedPath {
}
}

impl<'de> Deserialize<'de> for OwnedPath {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let path: String = Deserialize::deserialize(deserializer)?;
Ok(parse_path(&path))
impl From<String> for OwnedPath {
fn from(raw_path: String) -> Self {
parse_path(raw_path.as_str())
}
}

impl Serialize for OwnedPath {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if self.segments.is_empty() {
serializer.serialize_str("<invalid>")
impl From<OwnedPath> for String {
fn from(owned: OwnedPath) -> Self {
if owned.segments.is_empty() {
String::from("<invalid>")
} else {
let mut coalesce_i = 0;

let path = self
owned
.segments
.iter()
.enumerate()
Expand Down Expand Up @@ -93,8 +89,7 @@ impl Serialize for OwnedPath {
}
})
.collect::<Vec<_>>()
.join("");
serializer.serialize_str(&path)
.join("")
}
}
}
Expand Down Expand Up @@ -146,7 +141,7 @@ impl<'a, const N: usize> From<[BorrowedSegment<'a>; N]> for OwnedPath {
}
}

#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum OwnedSegment {
Field(String),
Index(isize),
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-common/src/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub mod ser_de {
}
}

impl<'de> Configurable<'de> for TimeZone {
impl Configurable for TimeZone {
fn referencable_name() -> Option<&'static str> {
Some("vector_common::TimeZone")
}
Expand All @@ -97,7 +97,7 @@ impl<'de> Configurable<'de> for TimeZone {
Some("Strongly-typed list of timezones as defined in the `tz` database.")
}

fn generate_schema(gen: &mut SchemaGenerator, overrides: Metadata<'de, Self>) -> SchemaObject {
fn generate_schema(gen: &mut SchemaGenerator, overrides: Metadata<Self>) -> SchemaObject {
let mut schema = generate_string_schema();
finalize_schema(gen, &mut schema, overrides);
schema
Expand Down
44 changes: 43 additions & 1 deletion lib/vector-config-macros/src/ast/container.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashSet;

use darling::{error::Accumulator, util::Flag, FromAttributes};
use serde_derive_internals::{ast as serde_ast, Ctxt, Derive};
use syn::{DeriveInput, ExprPath, Generics, Ident, Type};
use syn::{DeriveInput, ExprPath, Generics, Ident, Type, TypeParam};
use vector_config_common::attributes::CustomAttribute;

use super::{
Expand Down Expand Up @@ -264,6 +266,46 @@ impl<'a> Container<'a> {
.into_iter()
.flat_map(|metadata| metadata.attributes())
}

/// Gets the generic types that are used within fields or variants that are part of the schemas.
///
/// In order to ensure we can allow for a maximally flexible `Configurable` trait, we add bounds to generic types that are
/// present on derived containers so that bounds don't need to be added on the actual container itself, essentially
/// avoiding declarations like `pub struct Foo<T> where T: Configurable {...}`.
///
/// We contain this logic here as we only care about generic type parameters that are present on fields that will be
/// included in the schema, so skipped fields shouldn't have bounds added, and so on.
pub fn generic_field_types(&self) -> Vec<TypeParam> {
let mut generic_types = Vec::new();

let field_types = match &self.data {
Data::Struct(_, fields) => fields
.iter()
.filter(|f| f.visible())
.filter_map(|f| match f.ty() {
Type::Path(tp) => tp.path.get_ident().cloned(),
_ => None,
})
.collect::<HashSet<_>>(),
Data::Enum(variants) => variants
.iter()
.filter(|v| v.visible())
.flat_map(|v| v.fields().iter())
.filter_map(|f| match f.ty() {
Type::Path(tp) => tp.path.get_ident().cloned(),
_ => None,
})
.collect::<HashSet<_>>(),
};

for type_param in self.original.generics.type_params() {
if field_types.contains(&type_param.ident) {
generic_types.push(type_param.clone());
}
}

generic_types
}
}

#[derive(Debug, Default, FromAttributes)]
Expand Down
Loading

0 comments on commit c640add

Please sign in to comment.