Skip to content

Commit

Permalink
Remove legacy EncodingConfiguration
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Sichert <[email protected]>
  • Loading branch information
pablosichert committed Jul 12, 2022
1 parent 8a7ffaf commit d1ccfb5
Show file tree
Hide file tree
Showing 85 changed files with 1,269 additions and 3,396 deletions.
6 changes: 2 additions & 4 deletions benches/files.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{convert::TryInto, path::PathBuf};

use bytes::Bytes;
use codecs::{encoding::FramingConfig, TextSerializerConfig};
use criterion::{criterion_group, BatchSize, Criterion, SamplingMode, Throughput};
use futures::{stream, SinkExt, StreamExt};
use tempfile::tempdir;
Expand Down Expand Up @@ -54,10 +55,7 @@ fn benchmark_files_no_partitions(c: &mut Criterion) {
sinks::file::FileSinkConfig {
path: output.try_into().unwrap(),
idle_timeout_secs: None,
encoding: sinks::util::encoding::EncodingConfig::from(
sinks::file::Encoding::Text,
)
.into(),
encoding: (None::<FramingConfig>, TextSerializerConfig::new()).into(),
compression: sinks::file::Compression::None,
acknowledgements: Default::default(),
},
Expand Down
86 changes: 82 additions & 4 deletions lib/codecs/src/encoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,100 @@ impl Encoder<Event> for JsonSerializer {
#[cfg(test)]
mod tests {
use bytes::BytesMut;
use chrono::{TimeZone, Utc};
use vector_common::btreemap;
use vector_core::event::{LogEvent, Value};
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};

use super::*;

#[test]
fn serialize_json() {
fn serialize_json_log() {
let event = Event::Log(LogEvent::from(btreemap! {
"foo" => Value::from("bar")
"x" => Value::from("23"),
"z" => Value::from(25),
"a" => Value::from("0"),
}));
let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(bytes.freeze(), r#"{"foo":"bar"}"#);
assert_eq!(bytes.freeze(), r#"{"a":"0","x":"23","z":25}"#);
}

#[test]
fn serialize_json_metric_counter() {
let event = Event::Metric(
Metric::new(
"foos",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_namespace(Some("vector"))
.with_tags(Some(
vec![
("key2".to_owned(), "value2".to_owned()),
("key1".to_owned(), "value1".to_owned()),
("Key3".to_owned(), "Value3".to_owned()),
]
.into_iter()
.collect(),
))
.with_timestamp(Some(Utc.ymd(2018, 11, 14).and_hms_nano(8, 9, 10, 11))),
);

let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"#
);
}

#[test]
fn serialize_json_metric_set() {
let event = Event::Metric(Metric::new(
"users",
MetricKind::Incremental,
MetricValue::Set {
values: vec!["bob".into()].into_iter().collect(),
},
));

let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"#
);
}

#[test]
fn serialize_json_metric_histogram_without_timestamp() {
let event = Event::Metric(Metric::new(
"glork",
MetricKind::Incremental,
MetricValue::Distribution {
samples: vector_core::samples![10.0 => 1],
statistic: StatisticKind::Histogram,
},
));

let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"#
);
}

#[test]
Expand Down
45 changes: 41 additions & 4 deletions src/codecs/encoding/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use codecs::encoding::{FramingConfig, SerializerConfig};
use crate::codecs::Transformer;
use codecs::encoding::{Framer, FramingConfig, Serializer, SerializerConfig};
use serde::{Deserialize, Serialize};

use crate::sinks::util::encoding::Transformer;

/// Config used to build an `Encoder`.
#[derive(Debug, Clone, Deserialize, Serialize)]
// `#[serde(deny_unknown_fields)]` doesn't work when flattening internally tagged enums, see
Expand Down Expand Up @@ -32,6 +31,23 @@ impl EncodingConfig {
pub const fn config(&self) -> &SerializerConfig {
&self.encoding
}

/// Build the `Serializer` for this config.
pub fn build(&self) -> crate::Result<Serializer> {
self.encoding.build()
}
}

impl<T> From<T> for EncodingConfig
where
T: Into<SerializerConfig>,
{
fn from(encoding: T) -> Self {
Self {
encoding: encoding.into(),
transformer: Default::default(),
}
}
}

/// Config used to build an `Encoder`.
Expand Down Expand Up @@ -70,14 +86,35 @@ impl EncodingConfigWithFraming {
pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
(&self.framing, &self.encoding.encoding)
}

/// Build the `Framer` and `Serializer` for this config.
pub fn build(&self) -> crate::Result<(Option<Framer>, Serializer)> {
Ok((
self.framing.as_ref().map(|framing| framing.build()),
self.encoding.build()?,
))
}
}

impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
where
F: Into<FramingConfig>,
S: Into<SerializerConfig>,
{
fn from((framing, encoding): (Option<F>, S)) -> Self {
Self {
framing: framing.map(Into::into),
encoding: encoding.into().into(),
}
}
}

#[cfg(test)]
mod test {
use lookup::lookup_v2::parse_path;

use super::*;
use crate::sinks::util::encoding::{EncodingConfiguration, TimestampFormat};
use crate::codecs::encoding::TimestampFormat;

#[test]
fn deserialize_encoding_config() {
Expand Down
2 changes: 2 additions & 0 deletions src/codecs/encoding/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod config;
mod encoder;
mod transformer;

pub use config::{EncodingConfig, EncodingConfigWithFraming};
pub use encoder::Encoder;
pub use transformer::{TimestampFormat, Transformer};
Loading

0 comments on commit d1ccfb5

Please sign in to comment.