diff --git a/benches/codecs/encoder.rs b/benches/codecs/encoder.rs index f520328d7563c..a359767338a30 100644 --- a/benches/codecs/encoder.rs +++ b/benches/codecs/encoder.rs @@ -48,17 +48,6 @@ fn encoder(c: &mut Criterion) { "key3" => "value3" })); - group.throughput(Throughput::Bytes(input.size_of() as u64)); - group.bench_with_input("vector::sinks::util::encode_log", &(), |b, ()| { - b.iter_batched( - || vector::sinks::util::Encoding::Json.into(), - |encoding| { - vector::sinks::util::encode_log(input.clone(), &encoding).unwrap(); - }, - BatchSize::SmallInput, - ) - }); - group.throughput(Throughput::Bytes(input.size_of() as u64)); group.bench_with_input("JsonLogVecSerializer::encode", &(), |b, ()| { b.iter_batched( diff --git a/benches/files.rs b/benches/files.rs index 4b0abffca9452..909e9572ac0b1 100644 --- a/benches/files.rs +++ b/benches/files.rs @@ -1,6 +1,7 @@ use std::{convert::TryInto, path::PathBuf}; use bytes::Bytes; +use codecs::{encoding::FramingConfig, TextSerializerConfig}; use criterion::{criterion_group, BatchSize, Criterion, SamplingMode, Throughput}; use futures::{stream, SinkExt, StreamExt}; use tempfile::tempdir; @@ -54,10 +55,7 @@ fn benchmark_files_no_partitions(c: &mut Criterion) { sinks::file::FileSinkConfig { path: output.try_into().unwrap(), idle_timeout_secs: None, - encoding: sinks::util::encoding::EncodingConfig::from( - sinks::file::Encoding::Text, - ) - .into(), + encoding: (None::, TextSerializerConfig::new()).into(), compression: sinks::file::Compression::None, acknowledgements: Default::default(), }, diff --git a/benches/http.rs b/benches/http.rs index cbd53f2f7d2e0..9826deaee402a 100644 --- a/benches/http.rs +++ b/benches/http.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; +use codecs::{encoding::FramingConfig, TextSerializerConfig}; use criterion::{criterion_group, BatchSize, BenchmarkId, Criterion, SamplingMode, Throughput}; use futures::TryFutureExt; use hyper::{ @@ -9,7 +10,7 @@ use hyper::{ use tokio::runtime::Runtime; use vector::{ config, sinks, - sinks::util::{encoding::EncodingConfigWithFramingAdapter, BatchConfig, Compression}, + sinks::util::{BatchConfig, Compression}, sources, test_util::{next_addr, random_lines, runtime, send_lines, start_topology, wait_for_tcp}, Error, @@ -53,9 +54,8 @@ fn benchmark_http(c: &mut Criterion) { auth: Default::default(), headers: Default::default(), batch, - encoding: EncodingConfigWithFramingAdapter::legacy( - sinks::http::Encoding::Text.into(), - ), + encoding: (None::, TextSerializerConfig::new()) + .into(), request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), diff --git a/config/examples/docs_example.toml b/config/examples/docs_example.toml index b322f1a2a8bba..04c39ae8ecd0f 100644 --- a/config/examples/docs_example.toml +++ b/config/examples/docs_example.toml @@ -32,12 +32,13 @@ index = "vector-%Y-%m-%d" # daily indices # Send structured data to a cost-effective long-term storage [sinks.s3_archives] -inputs = ["apache_parser"] # don't sample for S3 -type = "aws_s3" -region = "us-east-1" -bucket = "my-log-archives" -key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format -compression = "gzip" # compress final objects -encoding = "ndjson" # new line delimited JSON +inputs = ["apache_parser"] # don't sample for S3 +type = "aws_s3" +region = "us-east-1" +bucket = "my-log-archives" +key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format +compression = "gzip" # compress final objects +framing.method = "newline_delimited" # new line delimited... +encoding.codec = "json" # ...JSON [sinks.s3_archives.batch] -max_bytes = 10000000 # 10mb uncompressed +max_bytes = 10000000 # 10mb uncompressed diff --git a/config/examples/environment_variables.toml b/config/examples/environment_variables.toml index 057dce6320fbd..1a2759bc65077 100644 --- a/config/examples/environment_variables.toml +++ b/config/examples/environment_variables.toml @@ -28,6 +28,6 @@ data_dir = "/var/lib/vector" # Print the data to STDOUT for inspection # Docs: https://vector.dev/docs/reference/sinks/console [sinks.out] - inputs = ["add_host"] - type = "console" - encoding = "json" + inputs = ["add_host"] + type = "console" + encoding.codec = "json" diff --git a/config/examples/es_s3_hybrid.toml b/config/examples/es_s3_hybrid.toml index d47aa5cdbd3fd..6d2cb6ecf2767 100644 --- a/config/examples/es_s3_hybrid.toml +++ b/config/examples/es_s3_hybrid.toml @@ -26,11 +26,12 @@ data_dir = "/var/lib/vector" # Send structured data to S3, a durable long-term storage [sinks.s3_archives] - inputs = ["apache_logs"] # don't sample - type = "aws_s3" - region = "us-east-1" - bucket = "my_log_archives" - encoding = "ndjson" - compression = "gzip" + inputs = ["apache_logs"] # don't sample + type = "aws_s3" + region = "us-east-1" + bucket = "my_log_archives" + framing.method = "newline_delimited" + encoding.codec = "json" + compression = "gzip" [sinks.s3_archives.batch] - max_size = 10000000 # 10mb uncompressed + max_size = 10000000 # 10mb uncompressed diff --git a/config/examples/file_to_cloudwatch_metrics.toml b/config/examples/file_to_cloudwatch_metrics.toml index 40f3dd1d3282d..affdc2995388e 100644 --- a/config/examples/file_to_cloudwatch_metrics.toml +++ b/config/examples/file_to_cloudwatch_metrics.toml @@ -34,12 +34,12 @@ tags = {method = "{{method}}", status = "{{status}}"} [sinks.console_metrics] inputs = ["log_to_metric"] type = "console" -encoding = "json" +encoding.codec = "json" [sinks.console_logs] inputs = ["remap"] type = "console" -encoding = "json" +encoding.codec = "json" [sinks.cloudwatch] inputs = ["log_to_metric"] diff --git a/config/examples/file_to_prometheus.toml b/config/examples/file_to_prometheus.toml index 7199bd1013e54..da0b8fa791117 100644 --- a/config/examples/file_to_prometheus.toml +++ b/config/examples/file_to_prometheus.toml @@ -51,12 +51,12 @@ name = "bytes_out_histogram" [sinks.console_metrics] inputs = ["log_to_metric"] type = "console" -encoding = "json" +encoding.codec = "json" [sinks.console_logs] inputs = ["remap"] type = "console" -encoding = "text" +encoding.codec = "text" [sinks.prometheus] inputs = ["log_to_metric"] diff --git a/config/examples/namespacing/sinks/s3_archives.toml b/config/examples/namespacing/sinks/s3_archives.toml index 29669b87531d0..4f40df9a58212 100644 --- a/config/examples/namespacing/sinks/s3_archives.toml +++ b/config/examples/namespacing/sinks/s3_archives.toml @@ -1,10 +1,11 @@ # Send structured data to a cost-effective long-term storage -inputs = ["apache_parser"] # don't sample for S3 -type = "aws_s3" -region = "us-east-1" -bucket = "my-log-archives" -key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format -compression = "gzip" # compress final objects -encoding = "ndjson" # new line delimited JSON +inputs = ["apache_parser"] # don't sample for S3 +type = "aws_s3" +region = "us-east-1" +bucket = "my-log-archives" +key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format +compression = "gzip" # compress final objects +framing.method = "newline_delimited" # new line delimited... +encoding.codec = "json" # ...JSON [batch] - max_bytes = 10000000 # 10mb uncompressed + max_bytes = 10000000 # 10mb uncompressed diff --git a/config/examples/prometheus_to_console.toml b/config/examples/prometheus_to_console.toml index 8c7666a2dc5b4..749eee023b6ec 100644 --- a/config/examples/prometheus_to_console.toml +++ b/config/examples/prometheus_to_console.toml @@ -14,4 +14,4 @@ scrape_interval_secs = 2 [sinks.console] inputs = ["prometheus"] type = "console" -encoding = "json" +encoding.codec = "json" diff --git a/config/examples/stdio.toml b/config/examples/stdio.toml index 51b7233fc6d70..42a839090f817 100644 --- a/config/examples/stdio.toml +++ b/config/examples/stdio.toml @@ -11,4 +11,4 @@ [sinks.out] inputs = ["in"] type = "console" - encoding = "text" + encoding.codec = "text" diff --git a/config/examples/wrapped_json.toml b/config/examples/wrapped_json.toml index b9a9c62b34cff..c8fa911ddc8bd 100644 --- a/config/examples/wrapped_json.toml +++ b/config/examples/wrapped_json.toml @@ -34,6 +34,6 @@ data_dir = "/var/lib/vector" # Print the data to STDOUT for inspection # Docs: https://vector.dev/docs/reference/sinks/console [sinks.out] - inputs = ["parse_json"] - type = "console" - encoding = "json" + inputs = ["parse_json"] + type = "console" + encoding.codec = "json" diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index a186e2f288d69..8aaec670cdee1 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -167,7 +167,7 @@ type = "stdin" [sinks.bar] type = "console" inputs = ["foo"] -encoding = "json" +encoding.codec = "json" ``` After the component construction phase, we'll be left with the tasks for each diff --git a/lib/codecs/src/encoding/format/json.rs b/lib/codecs/src/encoding/format/json.rs index 430c7dc0c61b4..1f26d95bf98da 100644 --- a/lib/codecs/src/encoding/format/json.rs +++ b/lib/codecs/src/encoding/format/json.rs @@ -68,22 +68,100 @@ impl Encoder for JsonSerializer { #[cfg(test)] mod tests { use bytes::BytesMut; + use chrono::{TimeZone, Utc}; use vector_common::btreemap; - use vector_core::event::{LogEvent, Value}; + use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value}; use super::*; #[test] - fn serialize_json() { + fn serialize_json_log() { let event = Event::Log(LogEvent::from(btreemap! { - "foo" => Value::from("bar") + "x" => Value::from("23"), + "z" => Value::from(25), + "a" => Value::from("0"), })); let mut serializer = JsonSerializer::new(); let mut bytes = BytesMut::new(); serializer.encode(event, &mut bytes).unwrap(); - assert_eq!(bytes.freeze(), r#"{"foo":"bar"}"#); + assert_eq!(bytes.freeze(), r#"{"a":"0","x":"23","z":25}"#); + } + + #[test] + fn serialize_json_metric_counter() { + let event = Event::Metric( + Metric::new( + "foos", + MetricKind::Incremental, + MetricValue::Counter { value: 100.0 }, + ) + .with_namespace(Some("vector")) + .with_tags(Some( + vec![ + ("key2".to_owned(), "value2".to_owned()), + ("key1".to_owned(), "value1".to_owned()), + ("Key3".to_owned(), "Value3".to_owned()), + ] + .into_iter() + .collect(), + )) + .with_timestamp(Some(Utc.ymd(2018, 11, 14).and_hms_nano(8, 9, 10, 11))), + ); + + let mut serializer = JsonSerializer::new(); + let mut bytes = BytesMut::new(); + + serializer.encode(event, &mut bytes).unwrap(); + + assert_eq!( + bytes.freeze(), + r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"# + ); + } + + #[test] + fn serialize_json_metric_set() { + let event = Event::Metric(Metric::new( + "users", + MetricKind::Incremental, + MetricValue::Set { + values: vec!["bob".into()].into_iter().collect(), + }, + )); + + let mut serializer = JsonSerializer::new(); + let mut bytes = BytesMut::new(); + + serializer.encode(event, &mut bytes).unwrap(); + + assert_eq!( + bytes.freeze(), + r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"# + ); + } + + #[test] + fn serialize_json_metric_histogram_without_timestamp() { + let event = Event::Metric(Metric::new( + "glork", + MetricKind::Incremental, + MetricValue::Distribution { + samples: vector_core::samples![10.0 => 1], + statistic: StatisticKind::Histogram, + }, + )); + + let mut serializer = JsonSerializer::new(); + let mut bytes = BytesMut::new(); + + serializer.encode(event, &mut bytes).unwrap(); + + assert_eq!( + bytes.freeze(), + r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"# + ); } #[test] diff --git a/lib/k8s-e2e-tests/tests/vector-agent.rs b/lib/k8s-e2e-tests/tests/vector-agent.rs index d4bcb82c10534..0d22f397ffd2f 100644 --- a/lib/k8s-e2e-tests/tests/vector-agent.rs +++ b/lib/k8s-e2e-tests/tests/vector-agent.rs @@ -43,7 +43,7 @@ const CUSTOM_RESOURCE_VECTOR_CONFIG: &str = indoc! {r#" [sinks.stdout] type = "console" inputs = ["kubernetes_logs"] - encoding = "json" + encoding.codec = "json" "#}; /// This test validates that vector picks up logs at the simplest case diff --git a/rfcs/2020-03-06-1999-api-extensions-for-lua-transform.md b/rfcs/2020-03-06-1999-api-extensions-for-lua-transform.md index 484bc3e808c89..f4abbd897d08c 100644 --- a/rfcs/2020-03-06-1999-api-extensions-for-lua-transform.md +++ b/rfcs/2020-03-06-1999-api-extensions-for-lua-transform.md @@ -475,7 +475,7 @@ Here `event` is an encoded event to be produced by the transform, and `lane` is > [sinks.example_console] > type = "console" > inputs = ["example_transform.example_lane"] # would output the event from `example_lane` -> encoding = "text" +> encoding.codec = "text" > ``` > > Other components connected to the same transform, but with different lanes names or without lane names at all would not receive any event. diff --git a/rfcs/2020-04-15-2341-wasm-plugins.md b/rfcs/2020-04-15-2341-wasm-plugins.md index 056c91076d9ab..b674011c97860 100644 --- a/rfcs/2020-04-15-2341-wasm-plugins.md +++ b/rfcs/2020-04-15-2341-wasm-plugins.md @@ -333,7 +333,7 @@ module = "target/wasm32-wasi/release/echo.wasm" healthcheck = true inputs = ["demo"] type = "console" -encoding = "json" +encoding.codec = "json" buffer.type = "memory" buffer.max_events = 500 buffer.when_full = "block" diff --git a/rfcs/2021-07-20-8288-csv-enrichment.md b/rfcs/2021-07-20-8288-csv-enrichment.md index 84623d83f6f37..4b7b78d73f7c9 100644 --- a/rfcs/2021-07-20-8288-csv-enrichment.md +++ b/rfcs/2021-07-20-8288-csv-enrichment.md @@ -37,7 +37,7 @@ To represent the CSV file we have a new top level configuration option. ```toml [enrichment_tables.csv_file] type = "file" - encoding = "csv" + encoding.codec = "csv" path = "\path_to_csv" delimiter = "," ``` diff --git a/skaffold/manifests/config.yaml b/skaffold/manifests/config.yaml index 96a673a4a23fa..a09364b396034 100644 --- a/skaffold/manifests/config.yaml +++ b/skaffold/manifests/config.yaml @@ -8,4 +8,4 @@ data: type = "console" inputs = ["kubernetes_logs", "internal_metrics"] target = "stdout" - encoding = "json" + encoding.codec = "json" diff --git a/soaks/disabled-tests/fluent_remap_aws_firehose/vector.toml b/soaks/disabled-tests/fluent_remap_aws_firehose/vector.toml index c30fd0ddcf5d6..e83c758ee1f97 100644 --- a/soaks/disabled-tests/fluent_remap_aws_firehose/vector.toml +++ b/soaks/disabled-tests/fluent_remap_aws_firehose/vector.toml @@ -41,7 +41,7 @@ stream_name = "soak_fluent_remap_firehose" endpoint = "http://localhost:8080" healthcheck.enabled = true compression = "none" -encoding = "json" +encoding.codec = "json" region = "us-east-2" auth.access_key_id = "totallyanaccesskeyid" auth.secret_access_key = "alsoasecretaccesskey" diff --git a/soaks/disabled-tests/http_datadog_filter_blackhole/vector.toml b/soaks/disabled-tests/http_datadog_filter_blackhole/vector.toml index fc14f44117254..2a53342ac4780 100644 --- a/soaks/disabled-tests/http_datadog_filter_blackhole/vector.toml +++ b/soaks/disabled-tests/http_datadog_filter_blackhole/vector.toml @@ -12,7 +12,8 @@ type = "internal_metrics" [sources.logs] type = "http" address = "0.0.0.0:8282" -encoding = "ndjson" +framing.method = "newline_delimited" +decoding.codec = "json" ## ## Transforms diff --git a/soaks/tests/http_pipelines_blackhole/vector.toml b/soaks/tests/http_pipelines_blackhole/vector.toml index e8ddec77df077..0be0b9df3e560 100644 --- a/soaks/tests/http_pipelines_blackhole/vector.toml +++ b/soaks/tests/http_pipelines_blackhole/vector.toml @@ -10,7 +10,7 @@ type = "internal_metrics" [sources.logs] type = "http" address = "0.0.0.0:8282" -encoding = "text" +decoding.codec = "bytes" ## ## Transforms diff --git a/soaks/tests/http_pipelines_blackhole_acks/vector.toml b/soaks/tests/http_pipelines_blackhole_acks/vector.toml index a179b82e9feb4..906ff8be1dfb7 100644 --- a/soaks/tests/http_pipelines_blackhole_acks/vector.toml +++ b/soaks/tests/http_pipelines_blackhole_acks/vector.toml @@ -10,7 +10,7 @@ type = "internal_metrics" [sources.logs] type = "http" address = "0.0.0.0:8282" -encoding = "text" +decoding.codec = "bytes" acknowledgements = true ## diff --git a/soaks/tests/http_pipelines_no_grok_blackhole/vector.toml b/soaks/tests/http_pipelines_no_grok_blackhole/vector.toml index 65d8fc57ca1d7..67d7bb1702757 100644 --- a/soaks/tests/http_pipelines_no_grok_blackhole/vector.toml +++ b/soaks/tests/http_pipelines_no_grok_blackhole/vector.toml @@ -10,7 +10,7 @@ type = "internal_metrics" [sources.logs] type = "http" address = "0.0.0.0:8282" -encoding = "text" +decoding.codec = "bytes" ## ## Transforms diff --git a/soaks/tests/http_text_to_http_json/vector.toml b/soaks/tests/http_text_to_http_json/vector.toml index 12c93f6fe94c6..1b3c2771b1159 100644 --- a/soaks/tests/http_text_to_http_json/vector.toml +++ b/soaks/tests/http_text_to_http_json/vector.toml @@ -7,7 +7,7 @@ data_dir = "/var/lib/vector" [sources.logs] type = "http" address = "0.0.0.0:8282" -encoding = "text" +decoding.codec = "bytes" ## ## Sinks diff --git a/soaks/tests/http_to_http_acks/vector.toml b/soaks/tests/http_to_http_acks/vector.toml index 46dac04a566b4..5db2bf1d60e00 100644 --- a/soaks/tests/http_to_http_acks/vector.toml +++ b/soaks/tests/http_to_http_acks/vector.toml @@ -25,7 +25,7 @@ address = "0.0.0.0:9090" type = "http" inputs = ["http_source"] uri = "http://localhost:8080" -encoding = "text" +encoding.codec = "text" healthcheck.enabled = false buffer.type = "memory" buffer.max_events = 50000 # buffer 50 payloads at a time diff --git a/soaks/tests/http_to_http_noack/vector.toml b/soaks/tests/http_to_http_noack/vector.toml index 58080a0491f4a..d355a8afc2375 100644 --- a/soaks/tests/http_to_http_noack/vector.toml +++ b/soaks/tests/http_to_http_noack/vector.toml @@ -25,7 +25,7 @@ address = "0.0.0.0:9090" type = "http" inputs = ["http_source"] uri = "http://localhost:8080" -encoding = "text" +encoding.codec = "text" healthcheck.enabled = false buffer.type = "memory" buffer.max_events = 50000 # buffer 50 payloads at a time diff --git a/soaks/tests/socket_to_socket_blackhole/vector.toml b/soaks/tests/socket_to_socket_blackhole/vector.toml index 30a3601f6e627..2c2b1540f077c 100644 --- a/soaks/tests/socket_to_socket_blackhole/vector.toml +++ b/soaks/tests/socket_to_socket_blackhole/vector.toml @@ -26,4 +26,4 @@ type = "socket" inputs = ["socket_source"] mode = "tcp" address = "localhost:8080" -encoding = "json" +encoding.codec = "json" diff --git a/soaks/tests/splunk_hec_route_s3/vector.toml b/soaks/tests/splunk_hec_route_s3/vector.toml index d727fb60991ff..1ab5549fa774e 100644 --- a/soaks/tests/splunk_hec_route_s3/vector.toml +++ b/soaks/tests/splunk_hec_route_s3/vector.toml @@ -39,7 +39,8 @@ inputs = ["container_type.sidecar"] endpoint = "http://localhost:8080" bucket = "vector-soak-sidecar" -encoding.codec = "ndjson" +framing.method = "newline_delimited" +encoding.codec = "json" encoding.except_fields = ["timestamp"] key_prefix = "v1/source_type/sidecar/aws_account_id/{{attrs.aws_account}}/system_id/{{attrs.systemid}}/service/{{attrs.c2cService}}/partition/{{attrs.c2cPartition}}/stage/{{attrs.c2cStage}}/year/%Y/month/%m/day/%d/hour/%H" @@ -54,7 +55,8 @@ inputs = ["container_type.service"] endpoint = "http://localhost:8080" bucket = "vector-soak-service" -encoding.codec = "ndjson" +framing.method = "newline_delimited" +encoding.codec = "json" encoding.except_fields = ["timestamp"] key_prefix = "v1/source_type/app/system_id/{{attrs.systemid}}/service/{{attrs.c2cService}}/partition/{{attrs.c2cPartition}}/stage/{{attrs.c2cStage}}/year/%Y/month/%m/day/%d/hour/%H" diff --git a/soaks/tests/splunk_hec_to_splunk_hec_logs_acks/vector.toml b/soaks/tests/splunk_hec_to_splunk_hec_logs_acks/vector.toml index f85d6fec53e5f..c2e8de5d8fba3 100644 --- a/soaks/tests/splunk_hec_to_splunk_hec_logs_acks/vector.toml +++ b/soaks/tests/splunk_hec_to_splunk_hec_logs_acks/vector.toml @@ -25,7 +25,7 @@ address = "0.0.0.0:9090" type = "splunk_hec_logs" inputs = ["splunk_hec"] endpoint = "http://localhost:8080" -encoding = "json" +encoding.codec = "json" token = "abcd1234" healthcheck.enabled = false acknowledgements.indexer_acknowledgements_enabled = true diff --git a/soaks/tests/splunk_hec_to_splunk_hec_logs_noack/vector.toml b/soaks/tests/splunk_hec_to_splunk_hec_logs_noack/vector.toml index 5758a2ccb586e..6cf0c06bc2b6b 100644 --- a/soaks/tests/splunk_hec_to_splunk_hec_logs_noack/vector.toml +++ b/soaks/tests/splunk_hec_to_splunk_hec_logs_noack/vector.toml @@ -25,7 +25,7 @@ address = "0.0.0.0:9090" type = "splunk_hec_logs" inputs = ["splunk_hec"] endpoint = "http://localhost:8080" -encoding = "json" +encoding.codec = "json" token = "abcd1234" healthcheck.enabled = false acknowledgements.indexer_acknowledgements_enabled = false diff --git a/soaks/tests/syslog_humio_logs/vector.toml b/soaks/tests/syslog_humio_logs/vector.toml index 7ff415aeab1fb..f9cbcf1e94545 100644 --- a/soaks/tests/syslog_humio_logs/vector.toml +++ b/soaks/tests/syslog_humio_logs/vector.toml @@ -26,6 +26,6 @@ address = "0.0.0.0:9090" type = "humio_logs" inputs = ["syslog"] endpoint = "http://localhost:8080" -encoding = "json" +encoding.codec = "json" token = "humio_token" healthcheck.enabled = false diff --git a/soaks/tests/syslog_splunk_hec_logs/vector.toml b/soaks/tests/syslog_splunk_hec_logs/vector.toml index d7133a34f83be..0802a25fe49bb 100644 --- a/soaks/tests/syslog_splunk_hec_logs/vector.toml +++ b/soaks/tests/syslog_splunk_hec_logs/vector.toml @@ -26,6 +26,6 @@ address = "0.0.0.0:9090" type = "splunk_hec_logs" inputs = ["syslog"] endpoint = "http://localhost:8080" -encoding = "json" +encoding.codec = "json" token = "abcd1234" healthcheck.enabled = false diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index 3b41494c21000..2f08d5c009676 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -1,8 +1,10 @@ -use codecs::encoding::{FramingConfig, SerializerConfig}; +use crate::codecs::Transformer; +use codecs::{ + encoding::{Framer, FramingConfig, Serializer, SerializerConfig}, + CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, +}; use serde::{Deserialize, Serialize}; -use crate::sinks::util::encoding::Transformer; - /// Config used to build an `Encoder`. #[derive(Debug, Clone, Deserialize, Serialize)] // `#[serde(deny_unknown_fields)]` doesn't work when flattening internally tagged enums, see @@ -32,6 +34,23 @@ impl EncodingConfig { pub const fn config(&self) -> &SerializerConfig { &self.encoding } + + /// Build the `Serializer` for this config. + pub fn build(&self) -> crate::Result { + self.encoding.build() + } +} + +impl From for EncodingConfig +where + T: Into, +{ + fn from(encoding: T) -> Self { + Self { + encoding: encoding.into(), + transformer: Default::default(), + } + } } /// Config used to build an `Encoder`. @@ -70,6 +89,53 @@ impl EncodingConfigWithFraming { pub const fn config(&self) -> (&Option, &SerializerConfig) { (&self.framing, &self.encoding.encoding) } + + /// Build the `Framer` and `Serializer` for this config. + pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> { + let framer = self.framing.as_ref().map(|framing| framing.build()); + let serializer = self.encoding.build()?; + + let framer = match (framer, &serializer) { + (Some(framer), _) => framer, + (None, Serializer::Json(_)) => match sink_type { + SinkType::StreamBased => NewlineDelimitedEncoder::new().into(), + SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(), + }, + (None, Serializer::Avro(_) | Serializer::Native(_)) => { + LengthDelimitedEncoder::new().into() + } + ( + None, + Serializer::Logfmt(_) + | Serializer::NativeJson(_) + | Serializer::RawMessage(_) + | Serializer::Text(_), + ) => NewlineDelimitedEncoder::new().into(), + }; + + Ok((framer, serializer)) + } +} + +/// The way a sink processes outgoing events. +pub enum SinkType { + /// Events are sent in a continuous stream. + StreamBased, + /// Events are sent in a batch as a message. + MessageBased, +} + +impl From<(Option, S)> for EncodingConfigWithFraming +where + F: Into, + S: Into, +{ + fn from((framing, encoding): (Option, S)) -> Self { + Self { + framing: framing.map(Into::into), + encoding: encoding.into().into(), + } + } } #[cfg(test)] @@ -77,7 +143,7 @@ mod test { use lookup::lookup_v2::parse_path; use super::*; - use crate::sinks::util::encoding::{EncodingConfiguration, TimestampFormat}; + use crate::codecs::encoding::TimestampFormat; #[test] fn deserialize_encoding_config() { diff --git a/src/codecs/encoding/mod.rs b/src/codecs/encoding/mod.rs index 07be58fd2edea..69ede063e896b 100644 --- a/src/codecs/encoding/mod.rs +++ b/src/codecs/encoding/mod.rs @@ -1,5 +1,7 @@ mod config; mod encoder; +mod transformer; -pub use config::{EncodingConfig, EncodingConfigWithFraming}; +pub use config::{EncodingConfig, EncodingConfigWithFraming, SinkType}; pub use encoder::Encoder; +pub use transformer::{TimestampFormat, Transformer}; diff --git a/src/codecs/encoding/transformer.rs b/src/codecs/encoding/transformer.rs new file mode 100644 index 0000000000000..b67b0a0fa4627 --- /dev/null +++ b/src/codecs/encoding/transformer.rs @@ -0,0 +1,360 @@ +#![deny(missing_docs)] + +use core::fmt::Debug; + +use lookup::{ + lookup_v2::{parse_path, OwnedPath}, + path, +}; +use serde::{Deserialize, Deserializer, Serialize}; +use value::Value; +use vector_core::event::{LogEvent, MaybeAsLogMut}; + +use crate::{event::Event, serde::skip_serializing_if_default}; + +/// Transformations to prepare an event for serialization. +#[derive(Debug, Clone, Default, PartialEq, Serialize)] +pub struct Transformer(TransformerInner); + +impl<'de> Deserialize<'de> for Transformer { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let transformer: TransformerInner = Deserialize::deserialize(deserializer)?; + Self::validate_fields( + transformer.only_fields.as_deref(), + transformer.except_fields.as_deref(), + ) + .map_err(serde::de::Error::custom)?; + Ok(Self(transformer)) + } +} + +impl Transformer { + /// Creates a new `Transformer`. + /// + /// Returns `Err` if `only_fields` and `except_fields` fail validation, i.e. are not mutually + /// exclusive. + pub fn new( + only_fields: Option>, + except_fields: Option>, + timestamp_format: Option, + ) -> Result { + let inner = TransformerInner { + only_fields, + except_fields, + timestamp_format, + }; + + Self::validate_fields(inner.only_fields.as_deref(), inner.except_fields.as_deref())?; + + Ok(Self(inner)) + } + + /// Get the `Transformer`'s `only_fields`. + pub const fn only_fields(&self) -> &Option> { + &self.0.only_fields + } + + /// Get the `Transformer`'s `except_fields`. + pub const fn except_fields(&self) -> &Option> { + &self.0.except_fields + } + + /// Get the `Transformer`'s `timestamp_format`. + pub const fn timestamp_format(&self) -> &Option { + &self.0.timestamp_format + } + + /// Check if `except_fields` and `only_fields` items are mutually exclusive. + /// + /// If an error is returned, the entire encoding configuration should be considered inoperable. + fn validate_fields( + only_fields: Option<&[OwnedPath]>, + except_fields: Option<&[String]>, + ) -> crate::Result<()> { + if let (Some(only_fields), Some(except_fields)) = (only_fields, except_fields) { + if except_fields.iter().any(|f| { + let path_iter = parse_path(f); + only_fields.iter().any(|v| v == &path_iter) + }) { + return Err( + "`except_fields` and `only_fields` should be mutually exclusive.".into(), + ); + } + } + Ok(()) + } + + /// Prepare an event for serialization by the given transformation rules. + pub fn transform(&self, event: &mut Event) { + // Rules are currently applied to logs only. + if let Some(log) = event.maybe_as_log_mut() { + // Ordering in here should not matter. + self.apply_except_fields(log); + self.apply_only_fields(log); + self.apply_timestamp_format(log); + } + } + + fn apply_only_fields(&self, log: &mut LogEvent) { + if let Some(only_fields) = &self.0.only_fields { + let mut to_remove = match log.keys() { + Some(keys) => keys + .filter(|field| { + let field_path = parse_path(field); + !only_fields + .iter() + .any(|only| field_path.segments.starts_with(&only.segments[..])) + }) + .collect::>(), + None => vec![], + }; + + // reverse sort so that we delete array elements at the end first rather than + // the start so that any `nulls` at the end are dropped and empty arrays are + // pruned + to_remove.sort_by(|a, b| b.cmp(a)); + + for removal in to_remove { + log.remove_prune(removal.as_str(), true); + } + } + } + + fn apply_except_fields(&self, log: &mut LogEvent) { + if let Some(except_fields) = &self.0.except_fields { + for field in except_fields { + log.remove(field.as_str()); + } + } + } + + fn apply_timestamp_format(&self, log: &mut LogEvent) { + if let Some(timestamp_format) = &self.0.timestamp_format { + match timestamp_format { + TimestampFormat::Unix => { + if log.value().is_object() { + let mut unix_timestamps = Vec::new(); + for (k, v) in log.all_fields().expect("must be an object") { + if let Value::Timestamp(ts) = v { + unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp()))); + } + } + for (k, v) in unix_timestamps { + log.insert(k.as_str(), v); + } + } else { + // root is not an object + let timestamp = if let Value::Timestamp(ts) = log.value() { + Some(ts.timestamp()) + } else { + None + }; + if let Some(ts) = timestamp { + log.insert(path!(), Value::Integer(ts)); + } + } + } + // RFC3339 is the default serialization of a timestamp. + TimestampFormat::Rfc3339 => (), + } + } + } + + /// Set the `except_fields` value. + /// + /// Returns `Err` if the new `except_fields` fail validation, i.e. are not mutually exclusive + /// with `only_fields`. + pub fn set_except_fields(&mut self, except_fields: Option>) -> crate::Result<()> { + let transformer = TransformerInner { + only_fields: self.0.only_fields.clone(), + except_fields, + timestamp_format: self.0.timestamp_format, + }; + + Self::validate_fields( + transformer.only_fields.as_deref(), + transformer.except_fields.as_deref(), + )?; + + self.0 = transformer; + + Ok(()) + } +} + +impl From for Transformer { + fn from(inner: TransformerInner) -> Self { + Self(inner) + } +} + +#[derive(Debug, Clone, Default, PartialEq, Deserialize, Serialize)] +struct TransformerInner { + #[serde(default, skip_serializing_if = "skip_serializing_if_default")] + only_fields: Option>, + #[serde(default, skip_serializing_if = "skip_serializing_if_default")] + except_fields: Option>, + #[serde(default, skip_serializing_if = "skip_serializing_if_default")] + timestamp_format: Option, +} + +#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +/// The format in which a timestamp should be represented. +pub enum TimestampFormat { + /// Represent the timestamp as a Unix timestamp. + Unix, + /// Represent the timestamp as a RFC 3339 timestamp. + Rfc3339, +} + +#[cfg(test)] +mod tests { + use indoc::indoc; + use vector_core::config::log_schema; + + use super::*; + use std::collections::BTreeMap; + + #[test] + fn serialize() { + let string = + r#"{"only_fields":["a.b[0]"],"except_fields":["ignore_me"],"timestamp_format":"unix"}"#; + + let transformer = serde_json::from_str::(string).unwrap(); + + let serialized = serde_json::to_string(&transformer).unwrap(); + + assert_eq!(string, serialized); + } + + #[test] + fn serialize_empty() { + let string = "{}"; + + let transformer = serde_json::from_str::(string).unwrap(); + + let serialized = serde_json::to_string(&transformer).unwrap(); + + assert_eq!(string, serialized); + } + + #[test] + fn deserialize_and_transform_except() { + let transformer: Transformer = + toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d\\.z", "e"]"#).unwrap(); + let mut log = LogEvent::default(); + { + log.insert("a", 1); + log.insert("a.b", 1); + log.insert("a.b.c", 1); + log.insert("a.b.d", 1); + log.insert("b[0]", 1); + log.insert("b[1].x", 1); + log.insert("c[0].x", 1); + log.insert("c[0].y", 1); + log.insert("d\\.z", 1); + log.insert("e.a", 1); + log.insert("e.b", 1); + } + let mut event = Event::from(log); + transformer.transform(&mut event); + assert!(!event.as_mut_log().contains("a.b.c")); + assert!(!event.as_mut_log().contains("b")); + assert!(!event.as_mut_log().contains("b[1].x")); + assert!(!event.as_mut_log().contains("c[0].y")); + assert!(!event.as_mut_log().contains("d\\.z")); + assert!(!event.as_mut_log().contains("e.a")); + + assert!(event.as_mut_log().contains("a.b.d")); + assert!(event.as_mut_log().contains("c[0].x")); + } + + #[test] + fn deserialize_and_transform_only() { + let transformer: Transformer = + toml::from_str(r#"only_fields = ["a.b.c", "b", "c[0].y", "g\\.z"]"#).unwrap(); + let mut log = LogEvent::default(); + { + log.insert("a", 1); + log.insert("a.b", 1); + log.insert("a.b.c", 1); + log.insert("a.b.d", 1); + log.insert("b[0]", 1); + log.insert("b[1].x", 1); + log.insert("c[0].x", 1); + log.insert("c[0].y", 1); + log.insert("d.y", 1); + log.insert("d.z", 1); + log.insert("e[0]", 1); + log.insert("e[1]", 1); + log.insert("\"f.z\"", 1); + log.insert("\"g.z\"", 1); + log.insert("h", BTreeMap::new()); + log.insert("i", Vec::::new()); + } + let mut event = Event::from(log); + transformer.transform(&mut event); + assert!(event.as_mut_log().contains("a.b.c")); + assert!(event.as_mut_log().contains("b")); + assert!(event.as_mut_log().contains("b[1].x")); + assert!(event.as_mut_log().contains("c[0].y")); + assert!(event.as_mut_log().contains("\"g.z\"")); + + assert!(!event.as_mut_log().contains("a.b.d")); + assert!(!event.as_mut_log().contains("c[0].x")); + assert!(!event.as_mut_log().contains("d")); + assert!(!event.as_mut_log().contains("e")); + assert!(!event.as_mut_log().contains("f")); + assert!(!event.as_mut_log().contains("h")); + assert!(!event.as_mut_log().contains("i")); + } + + #[test] + fn deserialize_and_transform_timestamp() { + let transformer: Transformer = toml::from_str(r#"timestamp_format = "unix""#).unwrap(); + let mut event = Event::Log(LogEvent::from("Demo")); + let timestamp = event + .as_mut_log() + .get(log_schema().timestamp_key()) + .unwrap() + .clone(); + let timestamp = timestamp.as_timestamp().unwrap(); + event + .as_mut_log() + .insert("another", Value::Timestamp(*timestamp)); + + transformer.transform(&mut event); + + match event + .as_mut_log() + .get(log_schema().timestamp_key()) + .unwrap() + { + Value::Integer(_) => {} + e => panic!( + "Timestamp was not transformed into a Unix timestamp. Was {:?}", + e + ), + } + match event.as_mut_log().get("another").unwrap() { + Value::Integer(_) => {} + e => panic!( + "Timestamp was not transformed into a Unix timestamp. Was {:?}", + e + ), + } + } + + #[test] + fn exclusivity_violation() { + let config: std::result::Result = toml::from_str(indoc! {r#" + except_fields = ["Doop"] + only_fields = ["Doop"] + "#}); + assert!(config.is_err()) + } +} diff --git a/src/codecs/mod.rs b/src/codecs/mod.rs index 445153883a5cb..4247846cca3a8 100644 --- a/src/codecs/mod.rs +++ b/src/codecs/mod.rs @@ -8,5 +8,7 @@ mod encoding; mod ready_frames; pub use decoding::{Decoder, DecodingConfig}; -pub use encoding::{Encoder, EncodingConfig, EncodingConfigWithFraming}; +pub use encoding::{ + Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType, TimestampFormat, Transformer, +}; pub use ready_frames::ReadyFrames; diff --git a/src/config/format.rs b/src/config/format.rs index efe8d5744e892..6d8b23ae0b93c 100644 --- a/src/config/format.rs +++ b/src/config/format.rs @@ -149,7 +149,7 @@ mod tests { type = "socket" mode = "tcp" inputs = ["sample"] - encoding = "text" + encoding.codec = "text" address = "127.0.0.1:9999" "#; @@ -192,7 +192,8 @@ mod tests { r#" type: "socket""#, r#" mode: "tcp""#, r#" inputs: ["sample"]"#, - r#" encoding: "text""#, + r#" encoding:"#, + r#" codec: "text""#, r#" address: "127.0.0.1:9999""#, ), Format::Yaml, @@ -231,7 +232,9 @@ mod tests { "type": "socket", "mode": "tcp", "inputs": ["sample"], - "encoding": "text", + "encoding": { + "codec": "text" + }, "address": "127.0.0.1:9999" } } diff --git a/src/config/mod.rs b/src/config/mod.rs index 6e1cb80010bc6..2f7b6947beef9 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -915,7 +915,7 @@ mod tests { [sinks.out] type = "console" inputs = ["in"] - encoding = "json" + encoding.codec = "json" "#}, Format::Toml, ) @@ -948,7 +948,7 @@ mod tests { [sinks.out] type = "console" inputs = ["in"] - encoding = "json" + encoding.codec = "json" "#}, Format::Toml, ) @@ -980,7 +980,7 @@ mod tests { [sinks.out] type = "console" inputs = ["in"] - encoding = "json" + encoding.codec = "json" "#}, Format::Toml, ) @@ -1147,18 +1147,18 @@ mod acknowledgements_tests { [sinks.out1] type = "file" inputs = ["in1"] - encoding = "text" + encoding.codec = "text" path = "/path/to/out1" [sinks.out2] type = "file" inputs = ["in2"] - encoding = "text" + encoding.codec = "text" path = "/path/to/out2" acknowledgements = true [sinks.out3] type = "file" inputs = ["parse3"] - encoding = "text" + encoding.codec = "text" path = "/path/to/out3" acknowledgements.enabled = true "#}, @@ -1316,7 +1316,7 @@ mod resource_tests { [sinks.out] type = "console" inputs = ["in0","in1"] - encoding = "json" + encoding.codec = "json" "#}, Format::Toml, ) @@ -1363,7 +1363,7 @@ mod pipelines_tests { [sinks.out] type = "console" inputs = ["processing"] - encoding = "json" + encoding.codec = "json" "#}, Format::Toml, ); diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index 4e85036105328..7bbbb88c17b99 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -1,5 +1,6 @@ use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient; use aws_smithy_types::retry::RetryConfig; +use codecs::JsonSerializerConfig; use futures::FutureExt; use serde::{Deserialize, Serialize}; use tower::ServiceBuilder; @@ -9,7 +10,7 @@ use crate::{ create_client, create_smithy_client, resolve_region, AwsAuthentication, ClientBuilder, RegionOrEndpoint, }, - codecs::Encoder, + codecs::{Encoder, EncodingConfig}, config::{ log_schema, AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext, @@ -20,11 +21,8 @@ use crate::{ retry::CloudwatchRetryLogic, service::CloudwatchLogsPartitionSvc, sink::CloudwatchSink, }, util::{ - encoding::{ - EncodingConfig, EncodingConfigAdapter, StandardEncodings, StandardEncodingsMigrator, - }, - http::RequestConfig, - BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, + http::RequestConfig, BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, + TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -55,8 +53,7 @@ pub struct CloudwatchLogsSinkConfig { pub stream_name: Template, #[serde(flatten)] pub region: RegionOrEndpoint, - pub encoding: - EncodingConfigAdapter, StandardEncodingsMigrator>, + pub encoding: EncodingConfig, pub create_missing_group: Option, pub create_missing_stream: Option, #[serde(default)] @@ -126,7 +123,7 @@ impl SinkConfig for CloudwatchLogsSinkConfig { std::sync::Arc::new(smithy_client), )); let transformer = self.encoding.transformer(); - let serializer = self.encoding.clone().encoding()?; + let serializer = self.encoding.build()?; let encoder = Encoder::<()>::new(serializer); let healthcheck = healthcheck(self.clone(), client).boxed(); let sink = CloudwatchSink { @@ -160,17 +157,16 @@ impl SinkConfig for CloudwatchLogsSinkConfig { impl GenerateConfig for CloudwatchLogsSinkConfig { fn generate_config() -> toml::Value { - toml::Value::try_from(default_config(StandardEncodings::Json)).unwrap() + toml::Value::try_from(default_config(JsonSerializerConfig::new().into())).unwrap() } } -fn default_config(e: StandardEncodings) -> CloudwatchLogsSinkConfig { +fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig { CloudwatchLogsSinkConfig { - encoding: EncodingConfig::from(e).into(), + encoding, group_name: Default::default(), stream_name: Default::default(), region: Default::default(), - create_missing_group: Default::default(), create_missing_stream: Default::default(), compression: Default::default(), diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index 03ffda431e28e..f7f75377e7a1c 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -7,6 +7,7 @@ use std::str::FromStr; use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient; use aws_sdk_cloudwatchlogs::{Endpoint, Region}; use chrono::Duration; +use codecs::TextSerializerConfig; use futures::{stream, StreamExt}; use http::Uri; use pretty_assertions::assert_eq; @@ -18,10 +19,7 @@ use crate::sinks::aws_cloudwatch_logs::config::CloudwatchLogsClientBuilder; use crate::{ config::{log_schema, ProxyConfig, SinkConfig, SinkContext}, event::{Event, LogEvent, Value}, - sinks::util::{ - encoding::{EncodingConfig, StandardEncodings}, - BatchConfig, - }, + sinks::util::BatchConfig, template::Template, test_util::{ components::{run_and_assert_sink_compliance, AWS_SINK_TAGS}, @@ -46,7 +44,7 @@ async fn cloudwatch_insert_log_event() { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), region: RegionOrEndpoint::with_both("localstack", watchlogs_address().as_str()), - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: TextSerializerConfig::new().into(), create_missing_group: None, create_missing_stream: None, compression: Default::default(), @@ -96,7 +94,7 @@ async fn cloudwatch_insert_log_events_sorted() { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), region: RegionOrEndpoint::with_both("localstack", watchlogs_address().as_str()), - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: TextSerializerConfig::new().into(), create_missing_group: None, create_missing_stream: None, compression: Default::default(), @@ -165,7 +163,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), region: RegionOrEndpoint::with_both("localstack", watchlogs_address().as_str()), - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: TextSerializerConfig::new().into(), create_missing_group: None, create_missing_stream: None, compression: Default::default(), @@ -238,7 +236,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(group_name.as_str()).unwrap(), region: RegionOrEndpoint::with_both("localstack", watchlogs_address().as_str()), - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: TextSerializerConfig::new().into(), create_missing_group: None, create_missing_stream: None, compression: Default::default(), @@ -293,7 +291,7 @@ async fn cloudwatch_insert_log_event_batched() { stream_name: Template::try_from(stream_name.as_str()).unwrap(), group_name: Template::try_from(group_name.as_str()).unwrap(), region: RegionOrEndpoint::with_both("localstack", watchlogs_address().as_str()), - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: TextSerializerConfig::new().into(), create_missing_group: None, create_missing_stream: None, compression: Default::default(), @@ -343,7 +341,7 @@ async fn cloudwatch_insert_log_event_partitioned() { group_name: Template::try_from(GROUP_NAME).unwrap(), stream_name: Template::try_from(format!("{}-{{{{key}}}}", stream_name)).unwrap(), region: RegionOrEndpoint::with_both("localstack", watchlogs_address().as_str()), - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: TextSerializerConfig::new().into(), create_missing_group: None, create_missing_stream: None, compression: Default::default(), @@ -435,7 +433,7 @@ async fn cloudwatch_healthcheck() { stream_name: Template::try_from("test-stream").unwrap(), group_name: Template::try_from(GROUP_NAME).unwrap(), region: RegionOrEndpoint::with_both("localstack", watchlogs_address().as_str()), - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: TextSerializerConfig::new().into(), create_missing_group: None, create_missing_stream: None, compression: Default::default(), diff --git a/src/sinks/aws_cloudwatch_logs/request_builder.rs b/src/sinks/aws_cloudwatch_logs/request_builder.rs index e7f3ba86c87a7..1a96a4eeb73e1 100644 --- a/src/sinks/aws_cloudwatch_logs/request_builder.rs +++ b/src/sinks/aws_cloudwatch_logs/request_builder.rs @@ -8,11 +8,11 @@ use vector_core::{ use super::TemplateRenderingError; use crate::{ - codecs::Encoder, + codecs::{Encoder, Transformer}, config::LogSchema, event::{Event, Value}, internal_events::{AwsCloudwatchLogsEncoderError, AwsCloudwatchLogsMessageSizeError}, - sinks::{aws_cloudwatch_logs::CloudwatchKey, util::encoding::Transformer}, + sinks::aws_cloudwatch_logs::CloudwatchKey, template::Template, }; diff --git a/src/sinks/aws_kinesis_firehose/config.rs b/src/sinks/aws_kinesis_firehose/config.rs index 557cd3c378b6f..33229ecd481fe 100644 --- a/src/sinks/aws_kinesis_firehose/config.rs +++ b/src/sinks/aws_kinesis_firehose/config.rs @@ -10,7 +10,7 @@ use tower::ServiceBuilder; use crate::{ aws::{create_client, is_retriable_error, AwsAuthentication, ClientBuilder, RegionOrEndpoint}, - codecs::Encoder, + codecs::{Encoder, EncodingConfig}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext, @@ -22,11 +22,8 @@ use crate::{ sink::KinesisSink, }, util::{ - encoding::{ - EncodingConfig, EncodingConfigAdapter, StandardEncodings, StandardEncodingsMigrator, - }, - retries::RetryLogic, - BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, + retries::RetryLogic, BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, + TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -53,8 +50,7 @@ pub struct KinesisFirehoseSinkConfig { pub stream_name: String, #[serde(flatten)] pub region: RegionOrEndpoint, - pub encoding: - EncodingConfigAdapter, StandardEncodingsMigrator>, + pub encoding: EncodingConfig, #[serde(default)] pub compression: Compression, #[serde(default)] @@ -150,7 +146,7 @@ impl SinkConfig for KinesisFirehoseSinkConfig { }); let transformer = self.encoding.transformer(); - let serializer = self.encoding.encoding()?; + let serializer = self.encoding.build()?; let encoder = Encoder::<()>::new(serializer); let request_builder = KinesisRequestBuilder { diff --git a/src/sinks/aws_kinesis_firehose/integration_tests.rs b/src/sinks/aws_kinesis_firehose/integration_tests.rs index 8b6301d6098a6..7afd1afe83531 100644 --- a/src/sinks/aws_kinesis_firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis_firehose/integration_tests.rs @@ -3,6 +3,7 @@ use aws_sdk_elasticsearch::Client as EsClient; use aws_sdk_firehose::model::ElasticsearchDestinationConfiguration; +use codecs::JsonSerializerConfig; use futures::TryFutureExt; use serde_json::{json, Value}; use tokio::time::{sleep, Duration}; @@ -17,10 +18,7 @@ use crate::{ config::{SinkConfig, SinkContext}, sinks::{ elasticsearch::{ElasticsearchAuth, ElasticsearchCommon, ElasticsearchConfig}, - util::{ - encoding::{EncodingConfig, StandardEncodings}, - BatchConfig, Compression, TowerRequestConfig, - }, + util::{BatchConfig, Compression, TowerRequestConfig}, }, test_util::{ components::{run_and_assert_sink_compliance, AWS_SINK_TAGS}, @@ -52,7 +50,7 @@ async fn firehose_put_records() { let config = KinesisFirehoseSinkConfig { stream_name: stream.clone(), region: region.clone(), - encoding: EncodingConfig::from(StandardEncodings::Json).into(), // required for ES destination w/ localstack + encoding: JsonSerializerConfig::new().into(), // required for ES destination w/ localstack compression: Compression::None, batch, request: TowerRequestConfig { diff --git a/src/sinks/aws_kinesis_firehose/request_builder.rs b/src/sinks/aws_kinesis_firehose/request_builder.rs index ef0c68386f9ed..d296165aefa22 100644 --- a/src/sinks/aws_kinesis_firehose/request_builder.rs +++ b/src/sinks/aws_kinesis_firehose/request_builder.rs @@ -6,11 +6,9 @@ use bytes::Bytes; use vector_core::ByteSizeOf; use crate::{ - codecs::Encoder, + codecs::{Encoder, Transformer}, event::{Event, EventFinalizers, Finalizable, LogEvent}, - sinks::util::{ - encoding::Transformer, request_builder::EncodeResult, Compression, RequestBuilder, - }, + sinks::util::{request_builder::EncodeResult, Compression, RequestBuilder}, }; pub struct KinesisRequestBuilder { diff --git a/src/sinks/aws_kinesis_firehose/tests.rs b/src/sinks/aws_kinesis_firehose/tests.rs index 98a64b568c8c0..8ef9d2408fbec 100644 --- a/src/sinks/aws_kinesis_firehose/tests.rs +++ b/src/sinks/aws_kinesis_firehose/tests.rs @@ -1,5 +1,7 @@ #![cfg(test)] +use codecs::JsonSerializerConfig; + use super::*; use crate::{ aws::RegionOrEndpoint, @@ -8,11 +10,7 @@ use crate::{ aws_kinesis_firehose::config::{ KinesisFirehoseDefaultBatchSettings, MAX_PAYLOAD_EVENTS, MAX_PAYLOAD_SIZE, }, - util::{ - batch::BatchError, - encoding::{EncodingConfig, StandardEncodings}, - BatchConfig, Compression, - }, + util::{batch::BatchError, BatchConfig, Compression}, }, }; @@ -30,7 +28,7 @@ async fn check_batch_size() { let config = KinesisFirehoseSinkConfig { stream_name: String::from("test"), region: RegionOrEndpoint::with_both("local", "http://localhost:4566"), - encoding: EncodingConfig::from(StandardEncodings::Json).into(), + encoding: JsonSerializerConfig::new().into(), compression: Compression::None, batch, request: Default::default(), @@ -58,7 +56,7 @@ async fn check_batch_events() { let config = KinesisFirehoseSinkConfig { stream_name: String::from("test"), region: RegionOrEndpoint::with_both("local", "http://localhost:4566"), - encoding: EncodingConfig::from(StandardEncodings::Json).into(), + encoding: JsonSerializerConfig::new().into(), compression: Compression::None, batch, request: Default::default(), diff --git a/src/sinks/aws_kinesis_streams/config.rs b/src/sinks/aws_kinesis_streams/config.rs index e267241731cec..1006c06227d15 100644 --- a/src/sinks/aws_kinesis_streams/config.rs +++ b/src/sinks/aws_kinesis_streams/config.rs @@ -9,7 +9,7 @@ use tower::ServiceBuilder; use super::service::KinesisResponse; use crate::{ aws::{create_client, is_retriable_error, AwsAuthentication, ClientBuilder, RegionOrEndpoint}, - codecs::Encoder, + codecs::{Encoder, EncodingConfig}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext, @@ -19,11 +19,8 @@ use crate::{ request_builder::KinesisRequestBuilder, service::KinesisService, sink::KinesisSink, }, util::{ - encoding::{ - EncodingConfig, EncodingConfigAdapter, StandardEncodings, StandardEncodingsMigrator, - }, - retries::RetryLogic, - BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, + retries::RetryLogic, BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, + TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -78,8 +75,7 @@ pub struct KinesisSinkConfig { pub partition_key_field: Option, #[serde(flatten)] pub region: RegionOrEndpoint, - pub encoding: - EncodingConfigAdapter, StandardEncodingsMigrator>, + pub encoding: EncodingConfig, #[serde(default)] pub compression: Compression, #[serde(default)] @@ -159,7 +155,7 @@ impl SinkConfig for KinesisSinkConfig { }); let transformer = self.encoding.transformer(); - let serializer = self.encoding.encoding()?; + let serializer = self.encoding.build()?; let encoder = Encoder::<()>::new(serializer); let request_builder = KinesisRequestBuilder { diff --git a/src/sinks/aws_kinesis_streams/integration_tests.rs b/src/sinks/aws_kinesis_streams/integration_tests.rs index c14b083bb1bc8..72af750c72aa2 100644 --- a/src/sinks/aws_kinesis_streams/integration_tests.rs +++ b/src/sinks/aws_kinesis_streams/integration_tests.rs @@ -3,6 +3,7 @@ use aws_sdk_kinesis::model::{Record, ShardIteratorType}; use aws_sdk_kinesis::types::DateTime; +use codecs::TextSerializerConfig; use tokio::time::{sleep, Duration}; use super::*; @@ -11,10 +12,7 @@ use crate::{ config::{ProxyConfig, SinkConfig, SinkContext}, sinks::{ aws_kinesis_streams::config::KinesisClientBuilder, - util::{ - encoding::{EncodingConfig, StandardEncodings}, - BatchConfig, Compression, - }, + util::{BatchConfig, Compression}, }, test_util::{ components::{run_and_assert_sink_compliance, AWS_SINK_TAGS}, @@ -39,7 +37,7 @@ async fn kinesis_put_records() { stream_name: stream.clone(), partition_key_field: None, region: RegionOrEndpoint::with_both("localstack", kinesis_address().as_str()), - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: TextSerializerConfig::new().into(), compression: Compression::None, batch, request: Default::default(), diff --git a/src/sinks/aws_kinesis_streams/request_builder.rs b/src/sinks/aws_kinesis_streams/request_builder.rs index b831b3d06c1cd..21efc629ea725 100644 --- a/src/sinks/aws_kinesis_streams/request_builder.rs +++ b/src/sinks/aws_kinesis_streams/request_builder.rs @@ -6,11 +6,11 @@ use bytes::Bytes; use vector_core::ByteSizeOf; use crate::{ - codecs::Encoder, + codecs::{Encoder, Transformer}, event::{Event, EventFinalizers, Finalizable}, sinks::{ aws_kinesis_streams::sink::KinesisProcessedEvent, - util::{encoding::Transformer, request_builder::EncodeResult, Compression, RequestBuilder}, + util::{request_builder::EncodeResult, Compression, RequestBuilder}, }, }; diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 8fdd8d4f1ceeb..dbc1418356325 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,22 +1,21 @@ use std::convert::TryInto; use aws_sdk_s3::Client as S3Client; -use codecs::encoding::{Framer, Serializer}; -use codecs::{CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder}; +use codecs::encoding::{Framer, FramingConfig}; +use codecs::TextSerializerConfig; use serde::{Deserialize, Serialize}; use tower::ServiceBuilder; use vector_core::sink::VectorSink; -use super::sink::S3RequestOptions; -use crate::aws::{AwsAuthentication, RegionOrEndpoint}; -use crate::sinks::util::encoding::EncodingConfigWithFramingAdapter; use crate::{ - codecs::Encoder, + aws::{AwsAuthentication, RegionOrEndpoint}, + codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext, }, sinks::{ + aws_s3::sink::S3RequestOptions, s3_common::{ self, config::{S3Options, S3RetryLogic}, @@ -24,10 +23,8 @@ use crate::{ sink::S3Sink, }, util::{ - encoding::{EncodingConfig, StandardEncodings, StandardEncodingsWithFramingMigrator}, - partitioner::KeyPartitioner, - BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt, - TowerRequestConfig, + partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings, + Compression, ServiceBuilderExt, TowerRequestConfig, }, Healthcheck, }, @@ -51,10 +48,7 @@ pub struct S3SinkConfig { #[serde(flatten)] pub region: RegionOrEndpoint, #[serde(flatten)] - pub encoding: EncodingConfigWithFramingAdapter< - EncodingConfig, - StandardEncodingsWithFramingMigrator, - >, + pub encoding: EncodingConfigWithFraming, #[serde(default = "Compression::gzip_default")] pub compression: Compression, #[serde(default)] @@ -82,7 +76,7 @@ impl GenerateConfig for S3SinkConfig { filename_extension: None, options: S3Options::default(), region: RegionOrEndpoint::default(), - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: (None::, TextSerializerConfig::new()).into(), compression: Compression::gzip_default(), batch: BatchConfig::default(), request: TowerRequestConfig::default(), @@ -149,21 +143,7 @@ impl S3SinkConfig { .unwrap_or(DEFAULT_FILENAME_APPEND_UUID); let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.encoding()?; - let framer = match (framer, &serializer) { - (Some(framer), _) => framer, - (None, Serializer::Json(_)) => CharacterDelimitedEncoder::new(b',').into(), - (None, Serializer::Avro(_) | Serializer::Native(_)) => { - LengthDelimitedEncoder::new().into() - } - ( - None, - Serializer::Logfmt(_) - | Serializer::NativeJson(_) - | Serializer::RawMessage(_) - | Serializer::Text(_), - ) => NewlineDelimitedEncoder::new().into(), - }; + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); let request_options = S3RequestOptions { diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 13cf5528a4a96..77bf7c3267f8a 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -7,14 +7,14 @@ use uuid::Uuid; use vector_core::{event::Finalizable, ByteSizeOf}; use crate::{ - codecs::Encoder, + codecs::{Encoder, Transformer}, event::Event, sinks::{ s3_common::{ config::S3Options, service::{S3Metadata, S3Request}, }, - util::{encoding::Transformer, request_builder::EncodeResult, Compression, RequestBuilder}, + util::{request_builder::EncodeResult, Compression, RequestBuilder}, }, }; diff --git a/src/sinks/aws_s3/tests.rs b/src/sinks/aws_s3/tests.rs index 334920467870b..43155b756613a 100644 --- a/src/sinks/aws_s3/tests.rs +++ b/src/sinks/aws_s3/tests.rs @@ -15,6 +15,7 @@ mod integration_tests { use aws_sdk_s3::types::SdkError; use aws_sdk_s3::Client as S3Client; use bytes::Buf; + use codecs::{encoding::FramingConfig, TextSerializerConfig}; use flate2::read::MultiGzDecoder; use futures::{stream, Stream}; use pretty_assertions::assert_eq; @@ -32,10 +33,7 @@ mod integration_tests { sinks::{ aws_s3::S3SinkConfig, s3_common::config::S3Options, - util::{ - encoding::{EncodingConfig, StandardEncodings}, - BatchConfig, Compression, TowerRequestConfig, - }, + util::{BatchConfig, Compression, TowerRequestConfig}, }, test_util::{ components::{run_and_assert_sink_compliance, AWS_SINK_TAGS}, @@ -352,7 +350,7 @@ mod integration_tests { filename_extension: None, options: S3Options::default(), region: RegionOrEndpoint::with_both("minio", s3_address()), - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: (None::, TextSerializerConfig::new()).into(), compression: Compression::None, batch, request: TowerRequestConfig::default(), diff --git a/src/sinks/aws_sqs/config.rs b/src/sinks/aws_sqs/config.rs index 234c15bd735d1..129066a3b5e7c 100644 --- a/src/sinks/aws_sqs/config.rs +++ b/src/sinks/aws_sqs/config.rs @@ -1,7 +1,6 @@ use std::convert::TryFrom; use aws_sdk_sqs::Client as SqsClient; -use codecs::{encoding::SerializerConfig, JsonSerializerConfig, TextSerializerConfig}; use futures::FutureExt; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; @@ -9,15 +8,13 @@ use snafu::{ResultExt, Snafu}; use crate::{ aws::create_client, aws::{AwsAuthentication, RegionOrEndpoint}, + codecs::EncodingConfig, common::sqs::SqsClientBuilder, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext, }, - sinks::util::{ - encoding::{EncodingConfig, EncodingConfigAdapter, EncodingConfigMigrator}, - TowerRequestConfig, - }, + sinks::util::TowerRequestConfig, template::{Template, TemplateParseError}, tls::TlsConfig, }; @@ -34,27 +31,13 @@ pub(super) enum BuildError { MessageDeduplicationIdTemplate { source: TemplateParseError }, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EncodingMigrator; - -impl EncodingConfigMigrator for EncodingMigrator { - type Codec = Encoding; - - fn migrate(codec: &Self::Codec) -> SerializerConfig { - match codec { - Encoding::Text => TextSerializerConfig::new().into(), - Encoding::Json => JsonSerializerConfig::new().into(), - } - } -} - #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct SqsSinkConfig { pub queue_url: String, #[serde(flatten)] pub region: RegionOrEndpoint, - pub encoding: EncodingConfigAdapter, EncodingMigrator>, + pub encoding: EncodingConfig, pub message_group_id: Option, pub message_deduplication_id: Option, #[serde(default)] diff --git a/src/sinks/aws_sqs/integration_tests.rs b/src/sinks/aws_sqs/integration_tests.rs index 38e765284d156..7ed0248693bdd 100644 --- a/src/sinks/aws_sqs/integration_tests.rs +++ b/src/sinks/aws_sqs/integration_tests.rs @@ -4,16 +4,16 @@ use std::str::FromStr; use aws_sdk_sqs::model::QueueAttributeName; use aws_sdk_sqs::Client as SqsClient; use aws_sdk_sqs::{Endpoint, Region}; +use codecs::TextSerializerConfig; use http::Uri; use tokio::time::{sleep, Duration}; -use super::config::{Encoding, SqsSinkConfig}; +use super::config::SqsSinkConfig; use super::sink::SqsSink; use crate::aws::create_client; use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::common::sqs::SqsClientBuilder; use crate::config::ProxyConfig; -use crate::sinks::util::encoding::EncodingConfig; use crate::sinks::VectorSink; use crate::test_util::{ components::{run_and_assert_sink_compliance, AWS_SINK_TAGS}, @@ -52,7 +52,7 @@ async fn sqs_send_message_batch() { let config = SqsSinkConfig { queue_url: queue_url.clone(), region: RegionOrEndpoint::with_both("local", sqs_address().as_str()), - encoding: EncodingConfig::from(Encoding::Text).into(), + encoding: TextSerializerConfig::new().into(), message_group_id: None, message_deduplication_id: None, request: Default::default(), diff --git a/src/sinks/aws_sqs/request_builder.rs b/src/sinks/aws_sqs/request_builder.rs index 0e6111fee9f84..9fb8ae38da443 100644 --- a/src/sinks/aws_sqs/request_builder.rs +++ b/src/sinks/aws_sqs/request_builder.rs @@ -2,10 +2,9 @@ use bytes::Bytes; use vector_core::ByteSizeOf; use super::config::SqsSinkConfig; -use crate::codecs::Encoder; +use crate::codecs::{Encoder, Transformer}; use crate::event::{Event, EventFinalizers, Finalizable}; use crate::internal_events::TemplateRenderingError; -use crate::sinks::util::encoding::Transformer; use crate::sinks::util::request_builder::EncodeResult; use crate::sinks::util::{Compression, EncodedLength, RequestBuilder}; use crate::template::Template; @@ -29,7 +28,7 @@ pub(crate) struct SqsRequestBuilder { impl SqsRequestBuilder { pub fn new(config: SqsSinkConfig) -> crate::Result { let transformer = config.encoding.transformer(); - let serializer = config.encoding.encoding()?; + let serializer = config.encoding.build()?; let encoder = Encoder::<()>::new(serializer); Ok(Self { diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index ba73faad6b8c8..34687d559fe9a 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -1,29 +1,21 @@ use std::{convert::TryInto, sync::Arc}; use azure_storage_blobs::prelude::*; -use codecs::{ - encoding::{Framer, Serializer}, - CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, -}; +use codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig}; use serde::{Deserialize, Serialize}; use tower::ServiceBuilder; use super::request_builder::AzureBlobRequestOptions; use crate::{ - codecs::Encoder, + codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, sinks::{ azure_common::{ self, config::AzureBlobRetryLogic, service::AzureBlobService, sink::AzureBlobSink, }, util::{ - encoding::{ - EncodingConfig, EncodingConfigWithFramingAdapter, StandardEncodings, - StandardEncodingsWithFramingMigrator, - }, - partitioner::KeyPartitioner, - BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt, - TowerRequestConfig, + partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings, + Compression, ServiceBuilderExt, TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -40,10 +32,7 @@ pub struct AzureBlobSinkConfig { pub blob_time_format: Option, pub blob_append_uuid: Option, #[serde(flatten)] - pub encoding: EncodingConfigWithFramingAdapter< - EncodingConfig, - StandardEncodingsWithFramingMigrator, - >, + pub encoding: EncodingConfigWithFraming, #[serde(default = "Compression::gzip_default")] pub compression: Compression, #[serde(default)] @@ -67,7 +56,7 @@ impl GenerateConfig for AzureBlobSinkConfig { blob_prefix: Some(String::from("blob")), blob_time_format: Some(String::from("%s")), blob_append_uuid: Some(true), - encoding: EncodingConfig::from(StandardEncodings::Ndjson).into(), + encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::new()).into(), compression: Compression::gzip_default(), batch: BatchConfig::default(), request: TowerRequestConfig::default(), @@ -135,21 +124,7 @@ impl AzureBlobSinkConfig { .unwrap_or(DEFAULT_FILENAME_APPEND_UUID); let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.clone().encoding()?; - let framer = match (framer, &serializer) { - (Some(framer), _) => framer, - (None, Serializer::Json(_)) => CharacterDelimitedEncoder::new(b',').into(), - (None, Serializer::Avro(_) | Serializer::Native(_)) => { - LengthDelimitedEncoder::new().into() - } - ( - None, - Serializer::Logfmt(_) - | Serializer::NativeJson(_) - | Serializer::RawMessage(_) - | Serializer::Text(_), - ) => NewlineDelimitedEncoder::new().into(), - }; + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); let request_options = AzureBlobRequestOptions { diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index e653c260ad00d..4cb59ac51cb59 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -6,6 +6,10 @@ use std::{ use azure_core::{prelude::Range, HttpError}; use azure_storage_blobs::prelude::*; use bytes::{Buf, BytesMut}; +use codecs::{ + encoding::FramingConfig, JsonSerializerConfig, NewlineDelimitedEncoderConfig, + TextSerializerConfig, +}; use flate2::read::GzDecoder; use futures::{stream, Stream, StreamExt}; use http::StatusCode; @@ -16,10 +20,7 @@ use crate::{ event::{Event, EventArray, LogEvent}, sinks::{ azure_common, - util::{ - encoding::{EncodingConfig, StandardEncodings}, - Compression, TowerRequestConfig, - }, + util::{Compression, TowerRequestConfig}, VectorSink, }, test_util::{ @@ -94,7 +95,11 @@ async fn azure_blob_insert_json_into_blob() { let config = AzureBlobSinkConfig::new_emulator().await; let config = AzureBlobSinkConfig { blob_prefix: Some(blob_prefix.clone()), - encoding: EncodingConfig::from(StandardEncodings::Ndjson).into(), + encoding: ( + Some(NewlineDelimitedEncoderConfig::new()), + JsonSerializerConfig::new(), + ) + .into(), ..config }; let sink = config.to_sink(); @@ -150,7 +155,11 @@ async fn azure_blob_insert_json_into_blob_gzip() { let config = AzureBlobSinkConfig::new_emulator().await; let config = AzureBlobSinkConfig { blob_prefix: Some(blob_prefix.clone()), - encoding: EncodingConfig::from(StandardEncodings::Ndjson).into(), + encoding: ( + Some(NewlineDelimitedEncoderConfig::new()), + JsonSerializerConfig::new(), + ) + .into(), compression: Compression::gzip_default(), ..config }; @@ -219,7 +228,7 @@ impl AzureBlobSinkConfig { blob_prefix: None, blob_time_format: None, blob_append_uuid: None, - encoding: EncodingConfig::from(StandardEncodings::Text).into(), + encoding: (None::, TextSerializerConfig::new()).into(), compression: Compression::None, batch: Default::default(), request: TowerRequestConfig::default(), diff --git a/src/sinks/azure_blob/request_builder.rs b/src/sinks/azure_blob/request_builder.rs index 2a337019820f5..76d90d560726a 100644 --- a/src/sinks/azure_blob/request_builder.rs +++ b/src/sinks/azure_blob/request_builder.rs @@ -5,11 +5,11 @@ use uuid::Uuid; use vector_core::ByteSizeOf; use crate::{ - codecs::Encoder, + codecs::{Encoder, Transformer}, event::{Event, Finalizable}, sinks::{ azure_common::config::{AzureBlobMetadata, AzureBlobRequest}, - util::{encoding::Transformer, request_builder::EncodeResult, Compression, RequestBuilder}, + util::{request_builder::EncodeResult, Compression, RequestBuilder}, }, }; diff --git a/src/sinks/azure_blob/test.rs b/src/sinks/azure_blob/test.rs index 3922d20efb81d..40326ad98f84f 100644 --- a/src/sinks/azure_blob/test.rs +++ b/src/sinks/azure_blob/test.rs @@ -1,19 +1,19 @@ use bytes::Bytes; use chrono::Utc; -use codecs::{encoding::Framer, NewlineDelimitedEncoder, TextSerializer}; +use codecs::{ + encoding::{Framer, FramingConfig}, + NewlineDelimitedEncoder, TextSerializer, TextSerializerConfig, +}; use vector_core::partition::Partitioner; use super::config::AzureBlobSinkConfig; use super::request_builder::AzureBlobRequestOptions; +use crate::codecs::EncodingConfigWithFraming; use crate::event::{Event, LogEvent}; -use crate::sinks::util::{ - encoding::{EncodingConfig, StandardEncodings}, - request_builder::RequestBuilder, - Compression, -}; +use crate::sinks::util::{request_builder::RequestBuilder, Compression}; use crate::{codecs::Encoder, sinks::util::request_builder::EncodeResult}; -fn default_config(e: StandardEncodings) -> AzureBlobSinkConfig { +fn default_config(encoding: EncodingConfigWithFraming) -> AzureBlobSinkConfig { AzureBlobSinkConfig { connection_string: Default::default(), storage_account: Default::default(), @@ -21,7 +21,7 @@ fn default_config(e: StandardEncodings) -> AzureBlobSinkConfig { blob_prefix: Default::default(), blob_time_format: Default::default(), blob_append_uuid: Default::default(), - encoding: EncodingConfig::from(e).into(), + encoding, compression: Compression::gzip_default(), batch: Default::default(), request: Default::default(), @@ -42,7 +42,7 @@ fn azure_blob_build_request_without_compression() { let sink_config = AzureBlobSinkConfig { blob_prefix: Some("blob".into()), container_name: container_name.clone(), - ..default_config(StandardEncodings::Text) + ..default_config((None::, TextSerializerConfig::new()).into()) }; let blob_time_format = String::from(""); let blob_append_uuid = false; @@ -83,7 +83,7 @@ fn azure_blob_build_request_with_compression() { let sink_config = AzureBlobSinkConfig { blob_prefix: Some("blob".into()), container_name: container_name.clone(), - ..default_config(StandardEncodings::Text) + ..default_config((None::, TextSerializerConfig::new()).into()) }; let blob_time_format = String::from(""); let blob_append_uuid = false; @@ -124,7 +124,7 @@ fn azure_blob_build_request_with_time_format() { let sink_config = AzureBlobSinkConfig { blob_prefix: Some("blob".into()), container_name: container_name.clone(), - ..default_config(StandardEncodings::Text) + ..default_config((None::, TextSerializerConfig::new()).into()) }; let blob_time_format = String::from("%F"); let blob_append_uuid = false; @@ -168,7 +168,7 @@ fn azure_blob_build_request_with_uuid() { let sink_config = AzureBlobSinkConfig { blob_prefix: Some("blob".into()), container_name: container_name.clone(), - ..default_config(StandardEncodings::Text) + ..default_config((None::, TextSerializerConfig::new()).into()) }; let blob_time_format = String::from(""); let blob_append_uuid = true; diff --git a/src/sinks/azure_monitor_logs.rs b/src/sinks/azure_monitor_logs.rs index 78591acfeb771..e19d127cf2a9b 100644 --- a/src/sinks/azure_monitor_logs.rs +++ b/src/sinks/azure_monitor_logs.rs @@ -13,12 +13,12 @@ use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use crate::{ + codecs::Transformer, config::{log_schema, AcknowledgementsConfig, Input, SinkConfig, SinkContext, SinkDescription}, event::{Event, Value}, http::HttpClient, sinks::{ util::{ - encoding::Transformer, http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, BatchConfig, BoxedRawValue, JsonArrayBuffer, RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, diff --git a/src/sinks/clickhouse.rs b/src/sinks/clickhouse.rs index 7fa7d901308dd..0bd4409beb54b 100644 --- a/src/sinks/clickhouse.rs +++ b/src/sinks/clickhouse.rs @@ -6,11 +6,11 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::{ + codecs::Transformer, config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext, SinkDescription}, event::Event, http::{Auth, HttpClient, HttpError, MaybeAuth}, sinks::util::{ - encoding::Transformer, http::{BatchedHttpSink, HttpEventEncoder, HttpRetryLogic, HttpSink}, retries::{RetryAction, RetryLogic}, BatchConfig, Buffer, Compression, RealtimeSizeBasedDefaultBatchSettings, @@ -309,8 +309,8 @@ mod integration_tests { use super::*; use crate::{ + codecs::TimestampFormat, config::{log_schema, SinkConfig, SinkContext}, - sinks::util::encoding::TimestampFormat, test_util::{ components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, random_string, trace_init, diff --git a/src/sinks/console/config.rs b/src/sinks/console/config.rs index eebd0b9c9af8a..351c01d68530d 100644 --- a/src/sinks/console/config.rs +++ b/src/sinks/console/config.rs @@ -1,22 +1,15 @@ use codecs::{ - encoding::{Framer, Serializer}, - LengthDelimitedEncoder, NewlineDelimitedEncoder, + encoding::{Framer, FramingConfig}, + JsonSerializerConfig, }; use futures::{future, FutureExt}; use serde::{Deserialize, Serialize}; use tokio::io; use crate::{ - codecs::Encoder, + codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, - sinks::{ - console::sink::WriterSink, - util::encoding::{ - EncodingConfig, EncodingConfigWithFramingAdapter, StandardEncodings, - StandardEncodingsWithFramingMigrator, - }, - Healthcheck, VectorSink, - }, + sinks::{console::sink::WriterSink, Healthcheck, VectorSink}, }; #[derive(Debug, Derivative, Deserialize, Serialize)] @@ -34,10 +27,7 @@ pub struct ConsoleSinkConfig { #[serde(default)] pub target: Target, #[serde(flatten)] - pub encoding: EncodingConfigWithFramingAdapter< - EncodingConfig, - StandardEncodingsWithFramingMigrator, - >, + pub encoding: EncodingConfigWithFraming, #[serde( default, deserialize_with = "crate::serde::bool_or_struct", @@ -50,7 +40,7 @@ impl GenerateConfig for ConsoleSinkConfig { fn generate_config() -> toml::Value { toml::Value::try_from(Self { target: Target::Stdout, - encoding: EncodingConfig::from(StandardEncodings::Json).into(), + encoding: (None::, JsonSerializerConfig::new()).into(), acknowledgements: Default::default(), }) .unwrap() @@ -62,21 +52,7 @@ impl GenerateConfig for ConsoleSinkConfig { impl SinkConfig for ConsoleSinkConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.encoding()?; - let framer = match (framer, &serializer) { - (Some(framer), _) => framer, - ( - None, - Serializer::Text(_) - | Serializer::Json(_) - | Serializer::Logfmt(_) - | Serializer::NativeJson(_) - | Serializer::RawMessage(_), - ) => NewlineDelimitedEncoder::new().into(), - (None, Serializer::Avro(_) | Serializer::Native(_)) => { - LengthDelimitedEncoder::new().into() - } - }; + let (framer, serializer) = self.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); let sink: VectorSink = match self.target { diff --git a/src/sinks/console/sink.rs b/src/sinks/console/sink.rs index 39d884f3aea80..a00801b559051 100644 --- a/src/sinks/console/sink.rs +++ b/src/sinks/console/sink.rs @@ -10,9 +10,9 @@ use vector_core::{ }; use crate::{ - codecs::Encoder, + codecs::{Encoder, Transformer}, event::{Event, EventStatus, Finalizable}, - sinks::util::{encoding::Transformer, StreamSink}, + sinks::util::StreamSink, }; pub struct WriterSink { @@ -68,56 +68,29 @@ where #[cfg(test)] mod test { - use chrono::{offset::TimeZone, Utc}; - use codecs::{BytesEncoder, NewlineDelimitedEncoder}; + use codecs::{JsonSerializer, NewlineDelimitedEncoder}; use futures::future::ready; use futures_util::stream; - use pretty_assertions::assert_eq; use vector_core::sink::VectorSink; use super::*; use crate::{ - event::{ - metric::{Metric, MetricKind, MetricValue, StatisticKind}, - Event, LogEvent, Value, - }, - sinks::util::encoding::{ - EncodingConfig, EncodingConfigWithFramingAdapter, StandardEncodings, - StandardEncodingsWithFramingMigrator, - }, + event::{Event, LogEvent}, test_util::components::{run_and_assert_sink_compliance, SINK_TAGS}, }; - fn encode_event( - event: Event, - encoding: EncodingConfigWithFramingAdapter< - EncodingConfig, - StandardEncodingsWithFramingMigrator, - >, - ) -> Result { - let (framer, serializer) = encoding.encoding().unwrap(); - let framer = framer.unwrap_or_else(|| BytesEncoder::new().into()); - let mut encoder = Encoder::::new(framer, serializer); - let mut bytes = BytesMut::new(); - encoder.encode(event, &mut bytes)?; - Ok(String::from_utf8_lossy(&bytes).to_string()) - } - #[tokio::test] async fn component_spec_compliance() { let event = Event::Log(LogEvent::from("foo")); - let encoding: EncodingConfigWithFramingAdapter< - EncodingConfig, - StandardEncodingsWithFramingMigrator, - > = EncodingConfig::from(StandardEncodings::Json).into(); - let transformer = encoding.transformer(); - let (_, serializer) = encoding.encoding().unwrap(); - let encoder = Encoder::::new(NewlineDelimitedEncoder::new().into(), serializer); + let encoder = Encoder::::new( + NewlineDelimitedEncoder::new().into(), + JsonSerializer::new().into(), + ); let sink = WriterSink { output: Vec::new(), - transformer, + transformer: Default::default(), encoder, }; @@ -128,100 +101,4 @@ mod test { ) .await; } - - #[test] - fn encodes_raw_logs() { - let event = Event::Log(LogEvent::from("foo")); - assert_eq!( - "foo", - encode_event(event, EncodingConfig::from(StandardEncodings::Text).into()).unwrap() - ); - } - - #[test] - fn encodes_log_events() { - let mut log = LogEvent::default(); - log.insert("x", Value::from("23")); - log.insert("z", Value::from(25)); - log.insert("a", Value::from("0")); - - let encoded = encode_event( - log.into(), - EncodingConfig::from(StandardEncodings::Json).into(), - ); - let expected = r#"{"a":"0","x":"23","z":25}"#; - assert_eq!(encoded.unwrap(), expected); - } - - #[test] - fn encodes_counter() { - let event = Event::Metric( - Metric::new( - "foos", - MetricKind::Incremental, - MetricValue::Counter { value: 100.0 }, - ) - .with_namespace(Some("vector")) - .with_tags(Some( - vec![ - ("key2".to_owned(), "value2".to_owned()), - ("key1".to_owned(), "value1".to_owned()), - ("Key3".to_owned(), "Value3".to_owned()), - ] - .into_iter() - .collect(), - )) - .with_timestamp(Some(Utc.ymd(2018, 11, 14).and_hms_nano(8, 9, 10, 11))), - ); - assert_eq!( - r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"#, - encode_event(event, EncodingConfig::from(StandardEncodings::Json).into()).unwrap() - ); - } - - #[test] - fn encodes_set() { - let event = Event::Metric(Metric::new( - "users", - MetricKind::Incremental, - MetricValue::Set { - values: vec!["bob".into()].into_iter().collect(), - }, - )); - assert_eq!( - r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"#, - encode_event(event, EncodingConfig::from(StandardEncodings::Json).into()).unwrap() - ); - } - - #[test] - fn encodes_histogram_without_timestamp() { - let event = Event::Metric(Metric::new( - "glork", - MetricKind::Incremental, - MetricValue::Distribution { - samples: vector_core::samples![10.0 => 1], - statistic: StatisticKind::Histogram, - }, - )); - assert_eq!( - r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"#, - encode_event(event, EncodingConfig::from(StandardEncodings::Json).into()).unwrap() - ); - } - - #[test] - fn encodes_metric_text() { - let event = Event::Metric(Metric::new( - "users", - MetricKind::Incremental, - MetricValue::Set { - values: vec!["bob".into()].into_iter().collect(), - }, - )); - assert_eq!( - "users{} + bob", - encode_event(event, EncodingConfig::from(StandardEncodings::Text).into()).unwrap() - ); - } } diff --git a/src/sinks/datadog/events/request_builder.rs b/src/sinks/datadog/events/request_builder.rs index 43d333463560c..e4052df88140d 100644 --- a/src/sinks/datadog/events/request_builder.rs +++ b/src/sinks/datadog/events/request_builder.rs @@ -1,16 +1,14 @@ use std::{io, sync::Arc}; use bytes::Bytes; +use codecs::JsonSerializer; use lookup::lookup_v2::OwnedSegment; use vector_core::ByteSizeOf; use crate::{ - event::{EventFinalizers, Finalizable, LogEvent}, - sinks::util::{ - encoding::{EncodingConfigFixed, StandardJsonEncoding, TimestampFormat}, - request_builder::EncodeResult, - Compression, ElementCount, RequestBuilder, - }, + codecs::{Encoder, TimestampFormat, Transformer}, + event::{Event, EventFinalizers, Finalizable}, + sinks::util::{request_builder::EncodeResult, Compression, ElementCount, RequestBuilder}, }; #[derive(Clone)] @@ -45,9 +43,14 @@ pub struct Metadata { pub event_byte_size: usize, } -#[derive(Default)] pub struct DatadogEventsRequestBuilder { - encoder: EncodingConfigFixed, + encoder: (Transformer, Encoder<()>), +} + +impl Default for DatadogEventsRequestBuilder { + fn default() -> Self { + Self::new() + } } impl DatadogEventsRequestBuilder { @@ -56,10 +59,10 @@ impl DatadogEventsRequestBuilder { } } -impl RequestBuilder for DatadogEventsRequestBuilder { +impl RequestBuilder for DatadogEventsRequestBuilder { type Metadata = Metadata; - type Events = LogEvent; - type Encoder = EncodingConfigFixed; + type Events = Event; + type Encoder = (Transformer, Encoder<()>); type Payload = Bytes; type Request = DatadogEventsRequest; type Error = io::Error; @@ -72,13 +75,14 @@ impl RequestBuilder for DatadogEventsRequestBuilder { &self.encoder } - fn split_input(&self, mut log: LogEvent) -> (Self::Metadata, Self::Events) { + fn split_input(&self, event: Event) -> (Self::Metadata, Self::Events) { + let mut log = event.into_log(); let metadata = Metadata { finalizers: log.take_finalizers(), api_key: log.metadata_mut().datadog_api_key(), event_byte_size: log.size_of(), }; - (metadata, log) + (metadata, Event::from(log)) } fn build_request( @@ -93,31 +97,33 @@ impl RequestBuilder for DatadogEventsRequestBuilder { } } -fn encoder() -> EncodingConfigFixed { - EncodingConfigFixed { - // DataDog Event API allows only some fields, and refuses - // to accept event if it contains any other field. - only_fields: Some( - [ - "aggregation_key", - "alert_type", - "date_happened", - "device_name", - "host", - "priority", - "related_event_id", - "source_type_name", - "tags", - "text", - "title", - ] - .iter() - .map(|field| vec![OwnedSegment::Field((*field).into())].into()) - .collect(), - ), - // DataDog Event API requires unix timestamp. - timestamp_format: Some(TimestampFormat::Unix), - codec: StandardJsonEncoding, - ..EncodingConfigFixed::default() - } +fn encoder() -> (Transformer, Encoder<()>) { + // DataDog Event API allows only some fields, and refuses + // to accept event if it contains any other field. + let only_fields = Some( + [ + "aggregation_key", + "alert_type", + "date_happened", + "device_name", + "host", + "priority", + "related_event_id", + "source_type_name", + "tags", + "text", + "title", + ] + .iter() + .map(|field| vec![OwnedSegment::Field((*field).into())].into()) + .collect(), + ); + // DataDog Event API requires unix timestamp. + let timestamp_format = Some(TimestampFormat::Unix); + + ( + Transformer::new(only_fields, None, timestamp_format) + .expect("transformer configuration must be valid"), + Encoder::<()>::new(JsonSerializer::new().into()), + ) } diff --git a/src/sinks/datadog/events/sink.rs b/src/sinks/datadog/events/sink.rs index 9ad2a6705d6de..4ee87c2fc7204 100644 --- a/src/sinks/datadog/events/sink.rs +++ b/src/sinks/datadog/events/sink.rs @@ -7,7 +7,7 @@ use vector_core::stream::DriverResponse; use crate::{ config::log_schema, - event::{Event, LogEvent}, + event::Event, internal_events::ParserMissingFieldError, sinks::{ datadog::events::request_builder::{DatadogEventsRequest, DatadogEventsRequestBuilder}, @@ -30,10 +30,6 @@ where let concurrency_limit = NonZeroUsize::new(50); let driver = input - .map(|event| { - // Panic: This sink only accepts Logs, so this should never panic - event.into_log() - }) .filter_map(ensure_required_fields) .request_builder(concurrency_limit, DatadogEventsRequestBuilder::new()) .filter_map(|request| async move { @@ -50,7 +46,9 @@ where } } -async fn ensure_required_fields(mut log: LogEvent) -> Option { +async fn ensure_required_fields(event: Event) -> Option { + let mut log = event.into_log(); + if !log.contains("title") { emit!(ParserMissingFieldError { field: "title" }); return None; @@ -86,7 +84,8 @@ async fn ensure_required_fields(mut log: LogEvent) -> Option { log.insert("source_type_name", name); } } - Some(log) + + Some(Event::from(log)) } #[async_trait] diff --git a/src/sinks/datadog/logs/config.rs b/src/sinks/datadog/logs/config.rs index a0c22fd641be9..51a5b135e5662 100644 --- a/src/sinks/datadog/logs/config.rs +++ b/src/sinks/datadog/logs/config.rs @@ -9,14 +9,15 @@ use vector_core::config::proxy::ProxyConfig; use super::{service::LogApiRetry, sink::LogSinkBuilder}; use crate::{ + codecs::Transformer, config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, http::HttpClient, schema, sinks::{ datadog::{get_api_validate_endpoint, healthcheck, logs::service::LogApiService, Region}, util::{ - encoding::Transformer, service::ServiceBuilderExt, BatchConfig, Compression, - SinkBatchSettings, TowerRequestConfig, + service::ServiceBuilderExt, BatchConfig, Compression, SinkBatchSettings, + TowerRequestConfig, }, Healthcheck, VectorSink, }, diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 89f7dbd67281a..af9f3902419b4 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -23,11 +23,10 @@ use vector_core::{ use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest}; use crate::{ - codecs::Encoder, + codecs::{Encoder, Transformer}, sinks::util::{ - encoding::{Encoder as _, Transformer}, - request_builder::EncodeResult, - Compression, Compressor, RequestBuilder, SinkBuilderExt, + encoding::Encoder as _, request_builder::EncodeResult, Compression, Compressor, + RequestBuilder, SinkBuilderExt, }, }; #[derive(Default)] diff --git a/src/sinks/datadog_archives.rs b/src/sinks/datadog_archives.rs index 06efbe6220b07..af5fd087952b3 100644 --- a/src/sinks/datadog_archives.rs +++ b/src/sinks/datadog_archives.rs @@ -33,7 +33,7 @@ use vector_core::{ use crate::{ aws::{AwsAuthentication, RegionOrEndpoint}, - codecs::Encoder, + codecs::{Encoder, Transformer}, config::{GenerateConfig, Input, SinkConfig, SinkContext}, gcp::{GcpAuthConfig, GcpAuthenticator}, http::HttpClient, @@ -60,7 +60,6 @@ use crate::{ sink::S3Sink, }, util::{ - encoding::Transformer, metadata::{RequestMetadata, RequestMetadataBuilder}, partitioner::KeyPartitioner, request_builder::EncodeResult, diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index f473ef551661a..f291111c39c66 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -10,11 +10,11 @@ use tower::ServiceBuilder; use crate::{ aws::RegionOrEndpoint, + codecs::Transformer, config::{log_schema, AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext}, event::{EventRef, LogEvent, Value}, http::HttpClient, internal_events::TemplateRenderingError, - sinks::util::encoding::Transformer, sinks::{ elasticsearch::{ retry::ElasticsearchRetryLogic, diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index 92655365ba40b..5f9a9453caa95 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -3,10 +3,11 @@ use std::{io, io::Write}; use vector_core::{event::Event, ByteSizeOf}; use crate::{ + codecs::Transformer, event::{EventFinalizers, Finalizable, LogEvent}, sinks::{ elasticsearch::BulkAction, - util::encoding::{as_tracked_write, Encoder, Transformer, VisitLogMut}, + util::encoding::{as_tracked_write, Encoder}, }, }; @@ -109,15 +110,6 @@ fn write_bulk_action( ) } -impl VisitLogMut for ProcessedEvent { - fn visit_logs_mut(&mut self, func: F) - where - F: Fn(&mut LogEvent), - { - func(&mut self.log); - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/sinks/elasticsearch/sink.rs b/src/sinks/elasticsearch/sink.rs index 6c6132dfef360..e6ea408921ce3 100644 --- a/src/sinks/elasticsearch/sink.rs +++ b/src/sinks/elasticsearch/sink.rs @@ -9,13 +9,14 @@ use vector_core::{ }; use crate::{ + codecs::Transformer, event::{Event, LogEvent, Value}, sinks::{ elasticsearch::{ encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder, service::ElasticsearchRequest, BulkAction, ElasticsearchCommonMode, }, - util::{encoding::Transformer, SinkBuilderExt, StreamSink}, + util::{SinkBuilderExt, StreamSink}, }, transforms::metric_to_log::MetricToLog, }; diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index 6302c2f9927eb..0e56707fbd2bd 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -1,14 +1,12 @@ use std::{collections::BTreeMap, convert::TryFrom}; -use super::BulkAction; -use crate::sinks::elasticsearch::BulkConfig; -use crate::sinks::util::encoding::Transformer; use crate::{ + codecs::Transformer, event::{LogEvent, Metric, MetricKind, MetricValue, Value}, sinks::{ elasticsearch::{ - sink::process_log, DataStreamConfig, ElasticsearchCommon, ElasticsearchConfig, - ElasticsearchMode, + sink::process_log, BulkAction, BulkConfig, DataStreamConfig, ElasticsearchCommon, + ElasticsearchConfig, ElasticsearchMode, }, util::encoding::Encoder, }, diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index b42f5ae39f0ef..0366f5bc18a84 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -4,8 +4,7 @@ use async_compression::tokio::write::GzipEncoder; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use codecs::{ - encoding::{Framer, FramingConfig, SerializerConfig}, - JsonSerializerConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, + encoding::{Framer, FramingConfig}, TextSerializerConfig, }; use futures::{ @@ -22,7 +21,7 @@ use tokio_util::codec::Encoder as _; use vector_core::{internal_event::EventsSent, ByteSizeOf}; use crate::{ - codecs::Encoder, + codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, @@ -30,13 +29,7 @@ use crate::{ event::{Event, EventStatus, Finalizable}, expiring_hash_map::ExpiringHashMap, internal_events::{FileBytesSent, FileIoError, FileOpen, TemplateRenderingError}, - sinks::util::{ - encoding::{ - EncodingConfig, EncodingConfigWithFramingAdapter, EncodingConfigWithFramingMigrator, - Transformer, - }, - StreamSink, - }, + sinks::util::StreamSink, template::Template, }; mod bytes_path; @@ -44,30 +37,13 @@ use std::convert::TryFrom; use bytes_path::BytesPath; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EncodingMigrator; - -impl EncodingConfigWithFramingMigrator for EncodingMigrator { - type Codec = Encoding; - - fn migrate(codec: &Self::Codec) -> (Option, SerializerConfig) { - match codec { - Encoding::Text => (None, TextSerializerConfig::new().into()), - Encoding::Ndjson => ( - Some(NewlineDelimitedEncoderConfig::new().into()), - JsonSerializerConfig::new().into(), - ), - } - } -} - #[derive(Deserialize, Serialize, Debug)] #[serde(deny_unknown_fields)] pub struct FileSinkConfig { pub path: Template, pub idle_timeout_secs: Option, #[serde(flatten)] - pub encoding: EncodingConfigWithFramingAdapter, EncodingMigrator>, + pub encoding: EncodingConfigWithFraming, #[serde( default, skip_serializing_if = "crate::serde::skip_serializing_if_default" @@ -90,7 +66,7 @@ impl GenerateConfig for FileSinkConfig { toml::Value::try_from(Self { path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(), idle_timeout_secs: None, - encoding: EncodingConfig::from(Encoding::Text).into(), + encoding: (None::, TextSerializerConfig::new()).into(), compression: Default::default(), acknowledgements: Default::default(), }) @@ -98,13 +74,6 @@ impl GenerateConfig for FileSinkConfig { } } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - Text, - Ndjson, -} - #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Copy)] #[serde(rename_all = "snake_case")] pub enum Compression { @@ -200,8 +169,7 @@ pub struct FileSink { impl FileSink { pub fn new(config: &FileSinkConfig) -> crate::Result { let transformer = config.encoding.transformer(); - let (framer, serializer) = config.encoding.encoding()?; - let framer = framer.unwrap_or_else(|| NewlineDelimitedEncoder::new().into()); + let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); Ok(Self { @@ -443,7 +411,7 @@ mod tests { let config = FileSinkConfig { path: template.clone().try_into().unwrap(), idle_timeout_secs: None, - encoding: EncodingConfig::from(Encoding::Text).into(), + encoding: (None::, TextSerializerConfig::new()).into(), compression: Compression::None, acknowledgements: Default::default(), }; @@ -479,7 +447,7 @@ mod tests { let config = FileSinkConfig { path: template.clone().try_into().unwrap(), idle_timeout_secs: None, - encoding: EncodingConfig::from(Encoding::Text).into(), + encoding: (None::, TextSerializerConfig::new()).into(), compression: Compression::Gzip, acknowledgements: Default::default(), }; @@ -520,7 +488,7 @@ mod tests { let config = FileSinkConfig { path: template.try_into().unwrap(), idle_timeout_secs: None, - encoding: EncodingConfig::from(Encoding::Text).into(), + encoding: (None::, TextSerializerConfig::new()).into(), compression: Compression::None, acknowledgements: Default::default(), }; @@ -605,7 +573,7 @@ mod tests { let config = FileSinkConfig { path: template.clone().try_into().unwrap(), idle_timeout_secs: Some(1), - encoding: EncodingConfig::from(Encoding::Text).into(), + encoding: (None::, TextSerializerConfig::new()).into(), compression: Compression::None, acknowledgements: Default::default(), }; diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index f74a7f664263c..dd8dec4c05cde 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -2,10 +2,7 @@ use std::{collections::HashMap, convert::TryFrom, io}; use bytes::Bytes; use chrono::Utc; -use codecs::{ - encoding::{Framer, Serializer}, - CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, -}; +use codecs::encoding::Framer; use http::header::{HeaderName, HeaderValue}; use indoc::indoc; use serde::{Deserialize, Serialize}; @@ -16,7 +13,7 @@ use uuid::Uuid; use vector_core::event::{EventFinalizers, Finalizable}; use crate::{ - codecs::Encoder, + codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, @@ -35,10 +32,6 @@ use crate::{ }, util::{ batch::BatchConfig, - encoding::{ - EncodingConfig, EncodingConfigWithFramingAdapter, StandardEncodings, - StandardEncodingsWithFramingMigrator, Transformer, - }, metadata::{RequestMetadata, RequestMetadataBuilder}, partitioner::KeyPartitioner, request_builder::EncodeResult, @@ -72,10 +65,7 @@ pub struct GcsSinkConfig { filename_append_uuid: Option, filename_extension: Option, #[serde(flatten)] - encoding: EncodingConfigWithFramingAdapter< - EncodingConfig, - StandardEncodingsWithFramingMigrator, - >, + encoding: EncodingConfigWithFraming, #[serde(default)] compression: Compression, #[serde(default)] @@ -94,7 +84,7 @@ pub struct GcsSinkConfig { } #[cfg(test)] -fn default_config(e: StandardEncodings) -> GcsSinkConfig { +fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig { GcsSinkConfig { bucket: Default::default(), acl: Default::default(), @@ -104,7 +94,7 @@ fn default_config(e: StandardEncodings) -> GcsSinkConfig { filename_time_format: Default::default(), filename_append_uuid: Default::default(), filename_extension: Default::default(), - encoding: EncodingConfig::from(e).into(), + encoding, compression: Compression::gzip_default(), batch: Default::default(), request: Default::default(), @@ -123,7 +113,8 @@ impl GenerateConfig for GcsSinkConfig { toml::from_str(indoc! {r#" bucket = "my-bucket" credentials_path = "/path/to/credentials.json" - encoding.codec = "ndjson" + framing.method = "newline_delimited" + encoding.codec = "json" "#}) .unwrap() } @@ -279,21 +270,7 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { impl RequestSettings { fn new(config: &GcsSinkConfig) -> crate::Result { let transformer = config.encoding.transformer(); - let (framer, serializer) = config.encoding.encoding()?; - let framer = match (framer, &serializer) { - (Some(framer), _) => framer, - (None, Serializer::Json(_)) => CharacterDelimitedEncoder::new(b',').into(), - (None, Serializer::Avro(_) | Serializer::Native(_)) => { - LengthDelimitedEncoder::new().into() - } - ( - None, - Serializer::Logfmt(_) - | Serializer::NativeJson(_) - | Serializer::RawMessage(_) - | Serializer::Text(_), - ) => NewlineDelimitedEncoder::new().into(), - }; + let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); let acl = config .acl @@ -349,6 +326,8 @@ fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, #[cfg(test)] mod tests { + use codecs::encoding::FramingConfig; + use codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig}; use futures_util::{future::ready, stream}; use vector_core::partition::Partitioner; @@ -375,7 +354,7 @@ mod tests { let client = HttpClient::new(tls, context.proxy()).expect("should not fail to create HTTP client"); - let config = default_config(StandardEncodings::Json); + let config = default_config((None::, JsonSerializerConfig::new()).into()); let sink = config .build_sink(client, mock_endpoint.to_string(), GcpAuthenticator::None) .expect("failed to build sink"); @@ -394,7 +373,7 @@ mod tests { let sink_config = GcsSinkConfig { key_prefix: Some("key: {{ key }}".into()), - ..default_config(StandardEncodings::Text) + ..default_config((None::, TextSerializerConfig::new()).into()) }; let key = sink_config .key_partitioner() @@ -416,7 +395,13 @@ mod tests { filename_extension: extension.map(Into::into), filename_append_uuid: Some(uuid), compression, - ..default_config(StandardEncodings::Ndjson) + ..default_config( + ( + Some(NewlineDelimitedEncoderConfig::new()), + JsonSerializerConfig::new(), + ) + .into(), + ) }; let log = LogEvent::default().into(); let key = sink_config diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index cba64a77ef374..44426e028da12 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -9,7 +9,7 @@ use snafu::{ResultExt, Snafu}; use tokio_util::codec::Encoder as _; use crate::{ - codecs::Encoder, + codecs::{Encoder, EncodingConfig, Transformer}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, @@ -20,10 +20,6 @@ use crate::{ sinks::{ gcs_common::config::healthcheck_response, util::{ - encoding::{ - EncodingConfig, EncodingConfigAdapter, StandardEncodings, - StandardEncodingsMigrator, Transformer, - }, http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig, }, @@ -63,7 +59,7 @@ pub struct PubsubConfig { pub batch: BatchConfig, #[serde(default)] pub request: TowerRequestConfig, - encoding: EncodingConfigAdapter, StandardEncodingsMigrator>, + encoding: EncodingConfig, #[serde(default)] pub tls: Option, @@ -154,7 +150,7 @@ impl PubsubSink { ); let transformer = config.encoding.transformer(); - let serializer = config.encoding.encoding()?; + let serializer = config.encoding.build()?; let encoder = Encoder::<()>::new(serializer); Ok(Self { @@ -252,6 +248,7 @@ mod tests { #[cfg(all(test, feature = "gcp-pubsub-integration-tests"))] mod integration_tests { + use codecs::JsonSerializerConfig; use reqwest::{Client, Method, Response}; use serde_json::{json, Value}; use vector_core::event::{BatchNotifier, BatchStatus}; @@ -276,7 +273,7 @@ mod integration_tests { }, batch: Default::default(), request: Default::default(), - encoding: EncodingConfig::from(StandardEncodings::Json).into(), + encoding: JsonSerializerConfig::new().into(), tls: Default::default(), acknowledgements: Default::default(), } diff --git a/src/sinks/gcp/stackdriver_logs.rs b/src/sinks/gcp/stackdriver_logs.rs index f8ca1b4664f79..9f3af816c1bc1 100644 --- a/src/sinks/gcp/stackdriver_logs.rs +++ b/src/sinks/gcp/stackdriver_logs.rs @@ -9,6 +9,7 @@ use serde_json::{json, map}; use snafu::Snafu; use crate::{ + codecs::Transformer, config::{log_schema, AcknowledgementsConfig, Input, SinkConfig, SinkContext, SinkDescription}, event::{Event, Value}, gcp::{GcpAuthConfig, GcpAuthenticator, Scope}, @@ -16,7 +17,6 @@ use crate::{ sinks::{ gcs_common::config::healthcheck_response, util::{ - encoding::Transformer, http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, BatchConfig, BoxedRawValue, JsonArrayBuffer, RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, diff --git a/src/sinks/honeycomb.rs b/src/sinks/honeycomb.rs index 0ed61aaaf17b2..5c714a3dcd85c 100644 --- a/src/sinks/honeycomb.rs +++ b/src/sinks/honeycomb.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use crate::{ + codecs::Transformer, config::{ log_schema, AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, @@ -12,7 +13,6 @@ use crate::{ event::{Event, Value}, http::HttpClient, sinks::util::{ - encoding::Transformer, http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig, }, diff --git a/src/sinks/http.rs b/src/sinks/http.rs index c84610db9bfc8..81bd06fbaedb4 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -1,11 +1,7 @@ use std::io::Write; use bytes::{BufMut, Bytes, BytesMut}; -use codecs::encoding::{ - CharacterDelimitedEncoder, CharacterDelimitedEncoderConfig, Framer, FramingConfig, - JsonSerializerConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, Serializer, - SerializerConfig, TextSerializerConfig, -}; +use codecs::encoding::{CharacterDelimitedEncoder, Framer, Serializer}; use flate2::write::{GzEncoder, ZlibEncoder}; use futures::{future, FutureExt, SinkExt}; use http::{ @@ -19,7 +15,7 @@ use snafu::{ResultExt, Snafu}; use tokio_util::codec::Encoder as _; use crate::{ - codecs::Encoder, + codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, @@ -28,10 +24,6 @@ use crate::{ http::{Auth, HttpClient, MaybeAuth}, sinks::util::{ self, - encoding::{ - EncodingConfig, EncodingConfigWithFramingAdapter, EncodingConfigWithFramingMigrator, - Transformer, - }, http::{BatchedHttpSink, HttpEventEncoder, RequestConfig}, BatchConfig, Buffer, Compression, RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, UriSerde, @@ -53,27 +45,6 @@ enum BuildError { }, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Migrator; - -impl EncodingConfigWithFramingMigrator for Migrator { - type Codec = Encoding; - - fn migrate(codec: &Self::Codec) -> (Option, SerializerConfig) { - match codec { - Encoding::Text => (None, TextSerializerConfig::new().into()), - Encoding::Ndjson => ( - Some(NewlineDelimitedEncoderConfig::new().into()), - JsonSerializerConfig::new().into(), - ), - Encoding::Json => ( - Some(CharacterDelimitedEncoderConfig::new(b',').into()), - JsonSerializerConfig::new().into(), - ), - } - } -} - #[derive(Deserialize, Serialize, Clone, Debug)] #[serde(deny_unknown_fields)] pub struct HttpSinkConfig { @@ -85,7 +56,7 @@ pub struct HttpSinkConfig { #[serde(default)] pub compression: Compression, #[serde(flatten)] - pub encoding: EncodingConfigWithFramingAdapter, Migrator>, + pub encoding: EncodingConfigWithFraming, #[serde(default)] pub batch: BatchConfig, #[serde(default)] @@ -114,14 +85,6 @@ pub enum HttpMethod { Patch, } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - Text, - Ndjson, - Json, -} - inventory::submit! { SinkDescription::new::("http") } @@ -155,16 +118,8 @@ struct HttpSink { } #[cfg(test)] -fn default_sink(encoding: Encoding) -> HttpSink { - let encoding = EncodingConfigWithFramingAdapter::, Migrator>::legacy( - encoding.into(), - ) - .encoding() - .unwrap(); - let framing = encoding - .0 - .unwrap_or_else(|| NewlineDelimitedEncoder::new().into()); - let serializer = encoding.1; +fn default_sink(encoding: EncodingConfigWithFraming) -> HttpSink { + let (framing, serializer) = encoding.build(SinkType::MessageBased).unwrap(); let encoder = Encoder::::new(framing, serializer); HttpSink { @@ -199,12 +154,8 @@ impl SinkConfig for HttpSinkConfig { request.add_old_option(self.headers.clone()); validate_headers(&request.headers, &self.auth)?; - let encoding = self.encoding.encoding()?; - let framing = encoding - .0 - .unwrap_or_else(|| NewlineDelimitedEncoder::new().into()); - let serializer = encoding.1; - let encoder = Encoder::::new(framing, serializer); + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; + let encoder = Encoder::::new(framer, serializer); let sink = HttpSink { uri: self.uri.with_default_parts(), @@ -404,6 +355,10 @@ mod tests { }; use bytes::{Buf, Bytes}; + use codecs::{ + encoding::FramingConfig, JsonSerializerConfig, NewlineDelimitedEncoderConfig, + TextSerializerConfig, + }; use flate2::read::MultiGzDecoder; use futures::{channel::mpsc, stream, StreamExt}; use headers::{Authorization, HeaderMapExt}; @@ -432,7 +387,7 @@ mod tests { fn http_encode_event_text() { let event = Event::Log(LogEvent::from("hello world")); - let sink = default_sink(Encoding::Text); + let sink = default_sink((None::, TextSerializerConfig::new()).into()); let mut encoder = sink.build_encoder(); let bytes = encoder.encode_event(event).unwrap(); @@ -443,7 +398,13 @@ mod tests { fn http_encode_event_ndjson() { let event = Event::Log(LogEvent::from("hello world")); - let sink = default_sink(Encoding::Ndjson); + let sink = default_sink( + ( + Some(NewlineDelimitedEncoderConfig::new()), + JsonSerializerConfig::new(), + ) + .into(), + ); let mut encoder = sink.build_encoder(); let bytes = encoder.encode_event(event).unwrap(); @@ -464,7 +425,7 @@ mod tests { fn http_validates_normal_headers() { let config = r#" uri = "http://$IN_ADDR/frames" - encoding = "text" + encoding.codec = "text" [request.headers] Auth = "token:thing_and-stuff" X-Custom-Nonsense = "_%_{}_-_&_._`_|_~_!_#_&_$_" @@ -478,7 +439,7 @@ mod tests { fn http_catches_bad_header_names() { let config = r#" uri = "http://$IN_ADDR/frames" - encoding = "text" + encoding.codec = "text" [request.headers] "\u0001" = "bad" "#; @@ -498,7 +459,7 @@ mod tests { async fn http_headers_auth_conflict() { let config = r#" uri = "http://$IN_ADDR/" - encoding = "text" + encoding.codec = "text" [request.headers] Authorization = "Basic base64encodedstring" [auth] @@ -691,7 +652,7 @@ mod tests { let config = r#" uri = "http://$IN_ADDR/frames" compression = "gzip" - encoding = "json" + encoding.codec = "json" [auth] strategy = "basic" @@ -782,7 +743,8 @@ mod tests { r#" uri = "http://{addr}/frames" compression = "gzip" - encoding = "ndjson" + framing.method = "newline_delimited" + encoding.codec = "json" {extras} "#, addr = in_addr, diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index 8c5f6ca6b8fd7..f932ef3c455e8 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -1,7 +1,9 @@ +use codecs::JsonSerializerConfig; use serde::{Deserialize, Serialize}; use super::host_key; use crate::{ + codecs::EncodingConfig, config::{ AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, @@ -12,12 +14,9 @@ use crate::{ acknowledgements::HecClientAcknowledgementsConfig, timestamp_key, EndpointTarget, SplunkHecDefaultBatchSettings, }, - logs::config::{HecEncoding, HecEncodingMigrator, HecLogsSinkConfig}, - }, - util::{ - encoding::{EncodingConfig, EncodingConfigAdapter}, - BatchConfig, Compression, TowerRequestConfig, + logs::config::HecLogsSinkConfig, }, + util::{BatchConfig, Compression, TowerRequestConfig}, Healthcheck, VectorSink, }, template::Template, @@ -34,7 +33,7 @@ pub struct HumioLogsConfig { #[serde(alias = "host")] pub(super) endpoint: Option, pub(super) source: Option