diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index d043ec8451b0b..2f08d5c009676 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -1,5 +1,8 @@ use crate::codecs::Transformer; -use codecs::encoding::{Framer, FramingConfig, Serializer, SerializerConfig}; +use codecs::{ + encoding::{Framer, FramingConfig, Serializer, SerializerConfig}, + CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, +}; use serde::{Deserialize, Serialize}; /// Config used to build an `Encoder`. @@ -88,14 +91,40 @@ impl EncodingConfigWithFraming { } /// Build the `Framer` and `Serializer` for this config. - pub fn build(&self) -> crate::Result<(Option, Serializer)> { - Ok(( - self.framing.as_ref().map(|framing| framing.build()), - self.encoding.build()?, - )) + pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> { + let framer = self.framing.as_ref().map(|framing| framing.build()); + let serializer = self.encoding.build()?; + + let framer = match (framer, &serializer) { + (Some(framer), _) => framer, + (None, Serializer::Json(_)) => match sink_type { + SinkType::StreamBased => NewlineDelimitedEncoder::new().into(), + SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(), + }, + (None, Serializer::Avro(_) | Serializer::Native(_)) => { + LengthDelimitedEncoder::new().into() + } + ( + None, + Serializer::Logfmt(_) + | Serializer::NativeJson(_) + | Serializer::RawMessage(_) + | Serializer::Text(_), + ) => NewlineDelimitedEncoder::new().into(), + }; + + Ok((framer, serializer)) } } +/// The way a sink processes outgoing events. +pub enum SinkType { + /// Events are sent in a continuous stream. + StreamBased, + /// Events are sent in a batch as a message. + MessageBased, +} + impl From<(Option, S)> for EncodingConfigWithFraming where F: Into, diff --git a/src/codecs/encoding/mod.rs b/src/codecs/encoding/mod.rs index 1e46660f5ceda..69ede063e896b 100644 --- a/src/codecs/encoding/mod.rs +++ b/src/codecs/encoding/mod.rs @@ -2,6 +2,6 @@ mod config; mod encoder; mod transformer; -pub use config::{EncodingConfig, EncodingConfigWithFraming}; +pub use config::{EncodingConfig, EncodingConfigWithFraming, SinkType}; pub use encoder::Encoder; pub use transformer::{TimestampFormat, Transformer}; diff --git a/src/codecs/mod.rs b/src/codecs/mod.rs index d3ce7bcec65b7..4247846cca3a8 100644 --- a/src/codecs/mod.rs +++ b/src/codecs/mod.rs @@ -9,6 +9,6 @@ mod ready_frames; pub use decoding::{Decoder, DecodingConfig}; pub use encoding::{ - Encoder, EncodingConfig, EncodingConfigWithFraming, TimestampFormat, Transformer, + Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType, TimestampFormat, Transformer, }; pub use ready_frames::ReadyFrames; diff --git a/src/serde.rs b/src/serde.rs index 88248b8456117..25c9bb564a2ca 100644 --- a/src/serde.rs +++ b/src/serde.rs @@ -1,3 +1,5 @@ +use std::any::{Any, TypeId}; + use codecs::{ decoding::{DeserializerConfig, FramingConfig}, BytesDecoderConfig, BytesDeserializerConfig, @@ -29,6 +31,11 @@ pub fn default_decoding() -> DeserializerConfig { BytesDeserializerConfig::new().into() } +#[inline] +pub fn skip_serializing_if_unit(_: &T) -> bool { + TypeId::of::<()>() == TypeId::of::() +} + /// Utilities for the `serde_json` crate. pub mod json { use bytes::{BufMut, BytesMut}; diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 72ba0f145fd43..d945ef2b06f12 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,24 +1,21 @@ use std::convert::TryInto; use aws_sdk_s3::Client as S3Client; -use codecs::encoding::{Framer, FramingConfig, Serializer}; -use codecs::{ - CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, - TextSerializerConfig, -}; +use codecs::encoding::{Framer, FramingConfig}; +use codecs::TextSerializerConfig; use serde::{Deserialize, Serialize}; use tower::ServiceBuilder; use vector_core::sink::VectorSink; -use super::sink::S3RequestOptions; -use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::{ - codecs::{Encoder, EncodingConfigWithFraming}, + aws::{AwsAuthentication, RegionOrEndpoint}, + codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext, }, sinks::{ + aws_s3::sink::S3RequestOptions, s3_common::{ self, config::{S3Options, S3RetryLogic}, @@ -150,21 +147,7 @@ impl S3SinkConfig { .unwrap_or(DEFAULT_FILENAME_APPEND_UUID); let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.build()?; - let framer = match (framer, &serializer) { - (Some(framer), _) => framer, - (None, Serializer::Json(_)) => CharacterDelimitedEncoder::new(b',').into(), - (None, Serializer::Avro(_) | Serializer::Native(_)) => { - LengthDelimitedEncoder::new().into() - } - ( - None, - Serializer::Logfmt(_) - | Serializer::NativeJson(_) - | Serializer::RawMessage(_) - | Serializer::Text(_), - ) => NewlineDelimitedEncoder::new().into(), - }; + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); let request_options = S3RequestOptions { diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index 69fb75222c34f..d745a6fccb817 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -1,17 +1,13 @@ use std::{convert::TryInto, sync::Arc}; use azure_storage_blobs::prelude::*; -use codecs::{ - encoding::{Framer, Serializer}, - CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder, - NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, -}; +use codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig}; use serde::{Deserialize, Serialize}; use tower::ServiceBuilder; use super::request_builder::AzureBlobRequestOptions; use crate::{ - codecs::{Encoder, EncodingConfigWithFraming}, + codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, sinks::{ azure_common::{ @@ -132,21 +128,7 @@ impl AzureBlobSinkConfig { .unwrap_or(DEFAULT_FILENAME_APPEND_UUID); let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.build()?; - let framer = match (framer, &serializer) { - (Some(framer), _) => framer, - (None, Serializer::Json(_)) => CharacterDelimitedEncoder::new(b',').into(), - (None, Serializer::Avro(_) | Serializer::Native(_)) => { - LengthDelimitedEncoder::new().into() - } - ( - None, - Serializer::Logfmt(_) - | Serializer::NativeJson(_) - | Serializer::RawMessage(_) - | Serializer::Text(_), - ) => NewlineDelimitedEncoder::new().into(), - }; + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); let request_options = AzureBlobRequestOptions { diff --git a/src/sinks/console/config.rs b/src/sinks/console/config.rs index e9190bf2d5db4..3f570590895fd 100644 --- a/src/sinks/console/config.rs +++ b/src/sinks/console/config.rs @@ -1,13 +1,13 @@ use codecs::{ - encoding::{Framer, FramingConfig, Serializer}, - JsonSerializerConfig, LengthDelimitedEncoder, NewlineDelimitedEncoder, + encoding::{Framer, FramingConfig}, + JsonSerializerConfig, }; use futures::{future, FutureExt}; use serde::{Deserialize, Serialize}; use tokio::io; use crate::{ - codecs::{Encoder, EncodingConfigWithFraming}, + codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, sinks::{console::sink::WriterSink, Healthcheck, VectorSink}, }; @@ -52,21 +52,7 @@ impl GenerateConfig for ConsoleSinkConfig { impl SinkConfig for ConsoleSinkConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.build()?; - let framer = match (framer, &serializer) { - (Some(framer), _) => framer, - ( - None, - Serializer::Text(_) - | Serializer::Json(_) - | Serializer::Logfmt(_) - | Serializer::NativeJson(_) - | Serializer::RawMessage(_), - ) => NewlineDelimitedEncoder::new().into(), - (None, Serializer::Avro(_) | Serializer::Native(_)) => { - LengthDelimitedEncoder::new().into() - } - }; + let (framer, serializer) = self.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); let sink: VectorSink = match self.target { diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 34199d8f1dfc3..ea92d15ad2d89 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use codecs::{ encoding::{Framer, FramingConfig}, - NewlineDelimitedEncoder, TextSerializerConfig, + TextSerializerConfig, }; use futures::{ future, @@ -21,7 +21,7 @@ use tokio_util::codec::Encoder as _; use vector_core::{buffers::Acker, internal_event::EventsSent, ByteSizeOf}; use crate::{ - codecs::{Encoder, EncodingConfigWithFraming, Transformer}, + codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, @@ -170,8 +170,7 @@ pub struct FileSink { impl FileSink { pub fn new(config: &FileSinkConfig, acker: Acker) -> crate::Result { let transformer = config.encoding.transformer(); - let (framer, serializer) = config.encoding.build()?; - let framer = framer.unwrap_or_else(|| NewlineDelimitedEncoder::new().into()); + let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); Ok(Self { diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index 984e5c52f638d..afdb1042dae47 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -2,10 +2,7 @@ use std::{collections::HashMap, convert::TryFrom, io}; use bytes::Bytes; use chrono::Utc; -use codecs::{ - encoding::{Framer, Serializer}, - CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, -}; +use codecs::encoding::Framer; use http::header::{HeaderName, HeaderValue}; use indoc::indoc; use serde::{Deserialize, Serialize}; @@ -16,7 +13,7 @@ use uuid::Uuid; use vector_core::event::{EventFinalizers, Finalizable}; use crate::{ - codecs::{Encoder, EncodingConfigWithFraming, Transformer}, + codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, @@ -274,21 +271,7 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { impl RequestSettings { fn new(config: &GcsSinkConfig) -> crate::Result { let transformer = config.encoding.transformer(); - let (framer, serializer) = config.encoding.build()?; - let framer = match (framer, &serializer) { - (Some(framer), _) => framer, - (None, Serializer::Json(_)) => CharacterDelimitedEncoder::new(b',').into(), - (None, Serializer::Avro(_) | Serializer::Native(_)) => { - LengthDelimitedEncoder::new().into() - } - ( - None, - Serializer::Logfmt(_) - | Serializer::NativeJson(_) - | Serializer::RawMessage(_) - | Serializer::Text(_), - ) => NewlineDelimitedEncoder::new().into(), - }; + let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); let acl = config .acl diff --git a/src/sinks/http.rs b/src/sinks/http.rs index 371bcd6c31067..23071b81559d3 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -1,7 +1,7 @@ use std::io::Write; use bytes::{BufMut, Bytes, BytesMut}; -use codecs::encoding::{CharacterDelimitedEncoder, Framer, NewlineDelimitedEncoder, Serializer}; +use codecs::encoding::{CharacterDelimitedEncoder, Framer, Serializer}; use flate2::write::{GzEncoder, ZlibEncoder}; use futures::{future, FutureExt, SinkExt}; use http::{ @@ -15,7 +15,7 @@ use snafu::{ResultExt, Snafu}; use tokio_util::codec::Encoder as _; use crate::{ - codecs::{Encoder, EncodingConfigWithFraming, Transformer}, + codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, @@ -119,11 +119,9 @@ struct HttpSink { #[cfg(test)] fn default_sink(encoding: EncodingConfigWithFraming) -> HttpSink { - let encoding = encoding.build().unwrap(); - let framing = encoding - .0 - .unwrap_or_else(|| NewlineDelimitedEncoder::new().into()); - let serializer = encoding.1; + let (framing, serializer) = encoding + .build(crate::codecs::SinkType::MessageBased) + .unwrap(); let encoder = Encoder::::new(framing, serializer); HttpSink { @@ -158,12 +156,8 @@ impl SinkConfig for HttpSinkConfig { request.add_old_option(self.headers.clone()); validate_headers(&request.headers, &self.auth)?; - let encoding = self.encoding.build()?; - let framing = encoding - .0 - .unwrap_or_else(|| NewlineDelimitedEncoder::new().into()); - let serializer = encoding.1; - let encoder = Encoder::::new(framing, serializer); + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; + let encoder = Encoder::::new(framer, serializer); let sink = HttpSink { uri: self.uri.with_default_parts(), diff --git a/src/sinks/new_relic_logs.rs b/src/sinks/new_relic_logs.rs index dea6a6185a816..85dab6cd18d27 100644 --- a/src/sinks/new_relic_logs.rs +++ b/src/sinks/new_relic_logs.rs @@ -159,6 +159,7 @@ mod tests { use super::*; use crate::{ + codecs::SinkType, config::SinkConfig, event::{Event, LogEvent}, sinks::util::{service::RATE_LIMIT_NUM_DEFAULT, test::build_test_server, Concurrency}, @@ -199,7 +200,11 @@ mod tests { ); assert_eq!(http_config.method, Some(HttpMethod::Post)); assert!(matches!( - http_config.encoding.build().unwrap().1, + http_config + .encoding + .build(SinkType::MessageBased) + .unwrap() + .1, Serializer::Json(_) )); assert_eq!(http_config.batch.max_bytes, Some(MAX_PAYLOAD_SIZE)); @@ -241,7 +246,11 @@ mod tests { ); assert_eq!(http_config.method, Some(HttpMethod::Post)); assert!(matches!( - http_config.encoding.build().unwrap().1, + http_config + .encoding + .build(SinkType::MessageBased) + .unwrap() + .1, Serializer::Json(_) )); assert_eq!(http_config.batch.max_bytes, Some(MAX_PAYLOAD_SIZE)); @@ -281,7 +290,11 @@ mod tests { ); assert_eq!(http_config.method, Some(HttpMethod::Post)); assert!(matches!( - http_config.encoding.build().unwrap().1, + http_config + .encoding + .build(SinkType::MessageBased) + .unwrap() + .1, Serializer::Json(_) )); assert_eq!(http_config.batch.max_bytes, Some(838860)); diff --git a/src/sinks/socket.rs b/src/sinks/socket.rs index fa9878261c6e8..088d0e100f088 100644 --- a/src/sinks/socket.rs +++ b/src/sinks/socket.rs @@ -1,13 +1,13 @@ use codecs::{ encoding::{Framer, FramingConfig}, - BytesEncoder, NewlineDelimitedEncoder, TextSerializerConfig, + TextSerializerConfig, }; use serde::{Deserialize, Serialize}; #[cfg(unix)] use crate::sinks::util::unix::UnixSinkConfig; use crate::{ - codecs::{Encoder, EncodingConfigWithFraming}, + codecs::{Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, @@ -21,17 +21,39 @@ use crate::{ pub struct SocketSinkConfig { #[serde(flatten)] pub mode: Mode, - #[serde(flatten)] - pub encoding: EncodingConfigWithFraming, } #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(tag = "mode", rename_all = "snake_case")] pub enum Mode { - Tcp(TcpSinkConfig), - Udp(UdpSinkConfig), + Tcp(TcpMode), + Udp(UdpMode), #[cfg(unix)] - Unix(UnixSinkConfig), + Unix(UnixMode), +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct TcpMode { + #[serde(flatten)] + config: TcpSinkConfig, + #[serde(flatten)] + encoding: EncodingConfigWithFraming, +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct UdpMode { + #[serde(flatten)] + config: UdpSinkConfig, + encoding: EncodingConfig, +} + +#[cfg(unix)] +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct UnixMode { + #[serde(flatten)] + config: UnixSinkConfig, + #[serde(flatten)] + encoding: EncodingConfigWithFraming, } inventory::submit! { @@ -50,15 +72,15 @@ impl GenerateConfig for SocketSinkConfig { } impl SocketSinkConfig { - pub const fn new(mode: Mode, encoding: EncodingConfigWithFraming) -> Self { - SocketSinkConfig { mode, encoding } + pub const fn new(mode: Mode) -> Self { + SocketSinkConfig { mode } } pub fn make_basic_tcp_config(address: String) -> Self { - Self::new( - Mode::Tcp(TcpSinkConfig::from_address(address)), - (None::, TextSerializerConfig::new()).into(), - ) + Self::new(Mode::Tcp(TcpMode { + config: TcpSinkConfig::from_address(address), + encoding: (None::, TextSerializerConfig::new()).into(), + })) } } @@ -69,25 +91,37 @@ impl SinkConfig for SocketSinkConfig { &self, cx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.build()?; - let framer = framer.unwrap_or_else(|| match self.mode { - Mode::Tcp(_) => NewlineDelimitedEncoder::new().into(), - Mode::Udp(_) => BytesEncoder::new().into(), - #[cfg(unix)] - Mode::Unix(_) => NewlineDelimitedEncoder::new().into(), - }); - let encoder = Encoder::::new(framer, serializer); match &self.mode { - Mode::Tcp(config) => config.build(cx, transformer, encoder), - Mode::Udp(config) => config.build(cx, transformer, encoder), + Mode::Tcp(TcpMode { config, encoding }) => { + let transformer = encoding.transformer(); + let (framer, serializer) = encoding.build(SinkType::StreamBased)?; + let encoder = Encoder::::new(framer, serializer); + config.build(cx, transformer, encoder) + } + Mode::Udp(UdpMode { config, encoding }) => { + let transformer = encoding.transformer(); + let serializer = encoding.build()?; + let encoder = Encoder::<()>::new(serializer); + config.build(cx, transformer, encoder) + } #[cfg(unix)] - Mode::Unix(config) => config.build(cx, transformer, encoder), + Mode::Unix(UnixMode { config, encoding }) => { + let transformer = encoding.transformer(); + let (framer, serializer) = encoding.build(SinkType::StreamBased)?; + let encoder = Encoder::::new(framer, serializer); + config.build(cx, transformer, encoder) + } } } fn input(&self) -> Input { - Input::new(self.encoding.config().1.input_type() & DataType::Log) + let encoder_input_type = match &self.mode { + Mode::Tcp(TcpMode { encoding, .. }) => encoding.config().1.input_type(), + Mode::Udp(UdpMode { encoding, .. }) => encoding.config().input_type(), + #[cfg(unix)] + Mode::Unix(UnixMode { encoding, .. }) => encoding.config().1.input_type(), + }; + Input::new(encoder_input_type & DataType::Log) } fn sink_type(&self) -> &'static str { @@ -136,8 +170,10 @@ mod test { let receiver = UdpSocket::bind(addr).unwrap(); let config = SocketSinkConfig { - mode: Mode::Udp(UdpSinkConfig::from_address(addr.to_string())), - encoding: (None::, JsonSerializerConfig::new()).into(), + mode: Mode::Udp(UdpMode { + config: UdpSinkConfig::from_address(addr.to_string()), + encoding: JsonSerializerConfig::new().into(), + }), }; let context = SinkContext::new_test(); let (sink, _healthcheck) = config.build(context).await.unwrap(); @@ -178,8 +214,10 @@ mod test { let addr = next_addr(); let config = SocketSinkConfig { - mode: Mode::Tcp(TcpSinkConfig::from_address(addr.to_string())), - encoding: (None::, JsonSerializerConfig::new()).into(), + mode: Mode::Tcp(TcpMode { + config: TcpSinkConfig::from_address(addr.to_string()), + encoding: (None::, JsonSerializerConfig::new()).into(), + }), }; let context = SinkContext::new_test(); @@ -241,21 +279,23 @@ mod test { let addr = next_addr(); let config = SocketSinkConfig { - mode: Mode::Tcp(TcpSinkConfig::new( - addr.to_string(), - None, - Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - verify_certificate: Some(false), - verify_hostname: Some(false), - ca_file: Some(tls::TEST_PEM_CRT_PATH.into()), - ..Default::default() - }, - }), - None, - )), - encoding: (None::, TextSerializerConfig::new()).into(), + mode: Mode::Tcp(TcpMode { + config: TcpSinkConfig::new( + addr.to_string(), + None, + Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + verify_certificate: Some(false), + verify_hostname: Some(false), + ca_file: Some(tls::TEST_PEM_CRT_PATH.into()), + ..Default::default() + }, + }), + None, + ), + encoding: (None::, TextSerializerConfig::new()).into(), + }), }; let context = SinkContext::new_test(); let (sink, _healthcheck) = config.build(context).await.unwrap(); @@ -370,8 +410,10 @@ mod test { let addr = next_addr(); let config = SocketSinkConfig { - mode: Mode::Tcp(TcpSinkConfig::from_address(addr.to_string())), - encoding: (None::, TextSerializerConfig::new()).into(), + mode: Mode::Tcp(TcpMode { + config: TcpSinkConfig::from_address(addr.to_string()), + encoding: (None::, TextSerializerConfig::new()).into(), + }), }; let context = SinkContext::new_test(); diff --git a/src/sinks/util/unix.rs b/src/sinks/util/unix.rs index 4cf4f5bb05d85..04a019342bc10 100644 --- a/src/sinks/util/unix.rs +++ b/src/sinks/util/unix.rs @@ -207,7 +207,7 @@ where #[cfg(test)] mod tests { - use codecs::encoding::Framer; + use codecs::{encoding::Framer, NewlineDelimitedEncoder, TextSerializer}; use tokio::net::UnixListener; use super::*; @@ -228,7 +228,7 @@ mod tests { .build( SinkContext::new_test(), Default::default(), - Encoder::::default() + Encoder::<()>::new(TextSerializer::new().into()) ) .unwrap() .1 @@ -240,7 +240,7 @@ mod tests { .build( SinkContext::new_test(), Default::default(), - Encoder::::default() + Encoder::<()>::new(TextSerializer::new().into()) ) .unwrap() .1 @@ -260,7 +260,14 @@ mod tests { let config = UnixSinkConfig::new(out_path); let cx = SinkContext::new_test(); let (sink, _healthcheck) = config - .build(cx, Default::default(), Encoder::::default()) + .build( + cx, + Default::default(), + Encoder::::new( + NewlineDelimitedEncoder::new().into(), + TextSerializer::new().into(), + ), + ) .unwrap(); // Send the test data