Skip to content

Commit

Permalink
Refactor inferring Framer based on sink type
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Sichert <[email protected]>
  • Loading branch information
pablosichert committed Jul 13, 2022
1 parent 2065569 commit 2ab8a6e
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 161 deletions.
41 changes: 35 additions & 6 deletions src/codecs/encoding/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::codecs::Transformer;
use codecs::encoding::{Framer, FramingConfig, Serializer, SerializerConfig};
use codecs::{
encoding::{Framer, FramingConfig, Serializer, SerializerConfig},
CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
};
use serde::{Deserialize, Serialize};

/// Config used to build an `Encoder`.
Expand Down Expand Up @@ -88,14 +91,40 @@ impl EncodingConfigWithFraming {
}

/// Build the `Framer` and `Serializer` for this config.
pub fn build(&self) -> crate::Result<(Option<Framer>, Serializer)> {
Ok((
self.framing.as_ref().map(|framing| framing.build()),
self.encoding.build()?,
))
pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> {
let framer = self.framing.as_ref().map(|framing| framing.build());
let serializer = self.encoding.build()?;

let framer = match (framer, &serializer) {
(Some(framer), _) => framer,
(None, Serializer::Json(_)) => match sink_type {
SinkType::StreamBased => NewlineDelimitedEncoder::new().into(),
SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
},
(None, Serializer::Avro(_) | Serializer::Native(_)) => {
LengthDelimitedEncoder::new().into()
}
(
None,
Serializer::Logfmt(_)
| Serializer::NativeJson(_)
| Serializer::RawMessage(_)
| Serializer::Text(_),
) => NewlineDelimitedEncoder::new().into(),
};

Ok((framer, serializer))
}
}

/// The way a sink processes outgoing events.
pub enum SinkType {
/// Events are sent in a continuous stream.
StreamBased,
/// Events are sent in a batch as a message.
MessageBased,
}

impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
where
F: Into<FramingConfig>,
Expand Down
2 changes: 1 addition & 1 deletion src/codecs/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ mod config;
mod encoder;
mod transformer;

pub use config::{EncodingConfig, EncodingConfigWithFraming};
pub use config::{EncodingConfig, EncodingConfigWithFraming, SinkType};
pub use encoder::Encoder;
pub use transformer::{TimestampFormat, Transformer};
2 changes: 1 addition & 1 deletion src/codecs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ mod ready_frames;

pub use decoding::{Decoder, DecodingConfig};
pub use encoding::{
Encoder, EncodingConfig, EncodingConfigWithFraming, TimestampFormat, Transformer,
Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType, TimestampFormat, Transformer,
};
pub use ready_frames::ReadyFrames;
7 changes: 7 additions & 0 deletions src/serde.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::any::{Any, TypeId};

use codecs::{
decoding::{DeserializerConfig, FramingConfig},
BytesDecoderConfig, BytesDeserializerConfig,
Expand Down Expand Up @@ -29,6 +31,11 @@ pub fn default_decoding() -> DeserializerConfig {
BytesDeserializerConfig::new().into()
}

#[inline]
pub fn skip_serializing_if_unit<T: ?Sized + Any>(_: &T) -> bool {
TypeId::of::<()>() == TypeId::of::<T>()
}

/// Utilities for the `serde_json` crate.
pub mod json {
use bytes::{BufMut, BytesMut};
Expand Down
29 changes: 6 additions & 23 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
use std::convert::TryInto;

use aws_sdk_s3::Client as S3Client;
use codecs::encoding::{Framer, FramingConfig, Serializer};
use codecs::{
CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
TextSerializerConfig,
};
use codecs::encoding::{Framer, FramingConfig};
use codecs::TextSerializerConfig;
use serde::{Deserialize, Serialize};
use tower::ServiceBuilder;
use vector_core::sink::VectorSink;

use super::sink::S3RequestOptions;
use crate::aws::{AwsAuthentication, RegionOrEndpoint};
use crate::{
codecs::{Encoder, EncodingConfigWithFraming},
aws::{AwsAuthentication, RegionOrEndpoint},
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
config::{
AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
SinkContext,
},
sinks::{
aws_s3::sink::S3RequestOptions,
s3_common::{
self,
config::{S3Options, S3RetryLogic},
Expand Down Expand Up @@ -150,21 +147,7 @@ impl S3SinkConfig {
.unwrap_or(DEFAULT_FILENAME_APPEND_UUID);

let transformer = self.encoding.transformer();
let (framer, serializer) = self.encoding.build()?;
let framer = match (framer, &serializer) {
(Some(framer), _) => framer,
(None, Serializer::Json(_)) => CharacterDelimitedEncoder::new(b',').into(),
(None, Serializer::Avro(_) | Serializer::Native(_)) => {
LengthDelimitedEncoder::new().into()
}
(
None,
Serializer::Logfmt(_)
| Serializer::NativeJson(_)
| Serializer::RawMessage(_)
| Serializer::Text(_),
) => NewlineDelimitedEncoder::new().into(),
};
let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);

let request_options = S3RequestOptions {
Expand Down
24 changes: 3 additions & 21 deletions src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use std::{convert::TryInto, sync::Arc};

use azure_storage_blobs::prelude::*;
use codecs::{
encoding::{Framer, Serializer},
CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig,
};
use codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};
use serde::{Deserialize, Serialize};
use tower::ServiceBuilder;

use super::request_builder::AzureBlobRequestOptions;
use crate::{
codecs::{Encoder, EncodingConfigWithFraming},
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
sinks::{
azure_common::{
Expand Down Expand Up @@ -132,21 +128,7 @@ impl AzureBlobSinkConfig {
.unwrap_or(DEFAULT_FILENAME_APPEND_UUID);

let transformer = self.encoding.transformer();
let (framer, serializer) = self.encoding.build()?;
let framer = match (framer, &serializer) {
(Some(framer), _) => framer,
(None, Serializer::Json(_)) => CharacterDelimitedEncoder::new(b',').into(),
(None, Serializer::Avro(_) | Serializer::Native(_)) => {
LengthDelimitedEncoder::new().into()
}
(
None,
Serializer::Logfmt(_)
| Serializer::NativeJson(_)
| Serializer::RawMessage(_)
| Serializer::Text(_),
) => NewlineDelimitedEncoder::new().into(),
};
let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);

let request_options = AzureBlobRequestOptions {
Expand Down
22 changes: 4 additions & 18 deletions src/sinks/console/config.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use codecs::{
encoding::{Framer, FramingConfig, Serializer},
JsonSerializerConfig, LengthDelimitedEncoder, NewlineDelimitedEncoder,
encoding::{Framer, FramingConfig},
JsonSerializerConfig,
};
use futures::{future, FutureExt};
use serde::{Deserialize, Serialize};
use tokio::io;

use crate::{
codecs::{Encoder, EncodingConfigWithFraming},
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
sinks::{console::sink::WriterSink, Healthcheck, VectorSink},
};
Expand Down Expand Up @@ -52,21 +52,7 @@ impl GenerateConfig for ConsoleSinkConfig {
impl SinkConfig for ConsoleSinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let transformer = self.encoding.transformer();
let (framer, serializer) = self.encoding.build()?;
let framer = match (framer, &serializer) {
(Some(framer), _) => framer,
(
None,
Serializer::Text(_)
| Serializer::Json(_)
| Serializer::Logfmt(_)
| Serializer::NativeJson(_)
| Serializer::RawMessage(_),
) => NewlineDelimitedEncoder::new().into(),
(None, Serializer::Avro(_) | Serializer::Native(_)) => {
LengthDelimitedEncoder::new().into()
}
};
let (framer, serializer) = self.encoding.build(SinkType::StreamBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);

let sink: VectorSink = match self.target {
Expand Down
7 changes: 3 additions & 4 deletions src/sinks/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use codecs::{
encoding::{Framer, FramingConfig},
NewlineDelimitedEncoder, TextSerializerConfig,
TextSerializerConfig,
};
use futures::{
future,
Expand All @@ -21,7 +21,7 @@ use tokio_util::codec::Encoder as _;
use vector_core::{buffers::Acker, internal_event::EventsSent, ByteSizeOf};

use crate::{
codecs::{Encoder, EncodingConfigWithFraming, Transformer},
codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
config::{
AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext,
SinkDescription,
Expand Down Expand Up @@ -170,8 +170,7 @@ pub struct FileSink {
impl FileSink {
pub fn new(config: &FileSinkConfig, acker: Acker) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let (framer, serializer) = config.encoding.build()?;
let framer = framer.unwrap_or_else(|| NewlineDelimitedEncoder::new().into());
let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);

Ok(Self {
Expand Down
23 changes: 3 additions & 20 deletions src/sinks/gcp/cloud_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use std::{collections::HashMap, convert::TryFrom, io};

use bytes::Bytes;
use chrono::Utc;
use codecs::{
encoding::{Framer, Serializer},
CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
};
use codecs::encoding::Framer;
use http::header::{HeaderName, HeaderValue};
use indoc::indoc;
use serde::{Deserialize, Serialize};
Expand All @@ -16,7 +13,7 @@ use uuid::Uuid;
use vector_core::event::{EventFinalizers, Finalizable};

use crate::{
codecs::{Encoder, EncodingConfigWithFraming, Transformer},
codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
config::{
AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext,
SinkDescription,
Expand Down Expand Up @@ -274,21 +271,7 @@ impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
impl RequestSettings {
fn new(config: &GcsSinkConfig) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let (framer, serializer) = config.encoding.build()?;
let framer = match (framer, &serializer) {
(Some(framer), _) => framer,
(None, Serializer::Json(_)) => CharacterDelimitedEncoder::new(b',').into(),
(None, Serializer::Avro(_) | Serializer::Native(_)) => {
LengthDelimitedEncoder::new().into()
}
(
None,
Serializer::Logfmt(_)
| Serializer::NativeJson(_)
| Serializer::RawMessage(_)
| Serializer::Text(_),
) => NewlineDelimitedEncoder::new().into(),
};
let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);
let acl = config
.acl
Expand Down
20 changes: 7 additions & 13 deletions src/sinks/http.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io::Write;

use bytes::{BufMut, Bytes, BytesMut};
use codecs::encoding::{CharacterDelimitedEncoder, Framer, NewlineDelimitedEncoder, Serializer};
use codecs::encoding::{CharacterDelimitedEncoder, Framer, Serializer};
use flate2::write::{GzEncoder, ZlibEncoder};
use futures::{future, FutureExt, SinkExt};
use http::{
Expand All @@ -15,7 +15,7 @@ use snafu::{ResultExt, Snafu};
use tokio_util::codec::Encoder as _;

use crate::{
codecs::{Encoder, EncodingConfigWithFraming, Transformer},
codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
config::{
AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext,
SinkDescription,
Expand Down Expand Up @@ -119,11 +119,9 @@ struct HttpSink {

#[cfg(test)]
fn default_sink(encoding: EncodingConfigWithFraming) -> HttpSink {
let encoding = encoding.build().unwrap();
let framing = encoding
.0
.unwrap_or_else(|| NewlineDelimitedEncoder::new().into());
let serializer = encoding.1;
let (framing, serializer) = encoding
.build(crate::codecs::SinkType::MessageBased)
.unwrap();
let encoder = Encoder::<Framer>::new(framing, serializer);

HttpSink {
Expand Down Expand Up @@ -158,12 +156,8 @@ impl SinkConfig for HttpSinkConfig {
request.add_old_option(self.headers.clone());
validate_headers(&request.headers, &self.auth)?;

let encoding = self.encoding.build()?;
let framing = encoding
.0
.unwrap_or_else(|| NewlineDelimitedEncoder::new().into());
let serializer = encoding.1;
let encoder = Encoder::<Framer>::new(framing, serializer);
let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);

let sink = HttpSink {
uri: self.uri.with_default_parts(),
Expand Down
19 changes: 16 additions & 3 deletions src/sinks/new_relic_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ mod tests {

use super::*;
use crate::{
codecs::SinkType,
config::SinkConfig,
event::{Event, LogEvent},
sinks::util::{service::RATE_LIMIT_NUM_DEFAULT, test::build_test_server, Concurrency},
Expand Down Expand Up @@ -199,7 +200,11 @@ mod tests {
);
assert_eq!(http_config.method, Some(HttpMethod::Post));
assert!(matches!(
http_config.encoding.build().unwrap().1,
http_config
.encoding
.build(SinkType::MessageBased)
.unwrap()
.1,
Serializer::Json(_)
));
assert_eq!(http_config.batch.max_bytes, Some(MAX_PAYLOAD_SIZE));
Expand Down Expand Up @@ -241,7 +246,11 @@ mod tests {
);
assert_eq!(http_config.method, Some(HttpMethod::Post));
assert!(matches!(
http_config.encoding.build().unwrap().1,
http_config
.encoding
.build(SinkType::MessageBased)
.unwrap()
.1,
Serializer::Json(_)
));
assert_eq!(http_config.batch.max_bytes, Some(MAX_PAYLOAD_SIZE));
Expand Down Expand Up @@ -281,7 +290,11 @@ mod tests {
);
assert_eq!(http_config.method, Some(HttpMethod::Post));
assert!(matches!(
http_config.encoding.build().unwrap().1,
http_config
.encoding
.build(SinkType::MessageBased)
.unwrap()
.1,
Serializer::Json(_)
));
assert_eq!(http_config.batch.max_bytes, Some(838860));
Expand Down
Loading

0 comments on commit 2ab8a6e

Please sign in to comment.