Skip to content

Commit

Permalink
chore(sinks, codecs): Remove legacy EncodingConfiguration (#13518)
Browse files Browse the repository at this point in the history
* Remove legacy `EncodingConfiguration`
* Upgrade usage of `encoding` in docs and examples
* Refactor inferring `Framer` based on sink type
* Fix cue formatting
* Remove obsolete `encode_log` bench
* Fix invalid usage of `encoding` in HTTP >source<
* Remove dead code
* Revert changes to highlight to be compatible with version in that context
* Add breaking changes to upgrade guide
* Fix `decoding` config for HTTP source in soaks
* Remove unnecessary namespace
* Improve wording/formatting in upgrade guide
* Remove TLD from links

Signed-off-by: Pablo Sichert <[email protected]>

Co-authored-by: Jesse Szwedko <[email protected]>
  • Loading branch information
pablosichert and jszwedko authored Jul 14, 2022
1 parent 7cc3a80 commit 9a9340a
Show file tree
Hide file tree
Showing 143 changed files with 2,219 additions and 4,308 deletions.
11 changes: 0 additions & 11 deletions benches/codecs/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,6 @@ fn encoder(c: &mut Criterion) {
"key3" => "value3"
}));

group.throughput(Throughput::Bytes(input.size_of() as u64));
group.bench_with_input("vector::sinks::util::encode_log", &(), |b, ()| {
b.iter_batched(
|| vector::sinks::util::Encoding::Json.into(),
|encoding| {
vector::sinks::util::encode_log(input.clone(), &encoding).unwrap();
},
BatchSize::SmallInput,
)
});

group.throughput(Throughput::Bytes(input.size_of() as u64));
group.bench_with_input("JsonLogVecSerializer::encode", &(), |b, ()| {
b.iter_batched(
Expand Down
6 changes: 2 additions & 4 deletions benches/files.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{convert::TryInto, path::PathBuf};

use bytes::Bytes;
use codecs::{encoding::FramingConfig, TextSerializerConfig};
use criterion::{criterion_group, BatchSize, Criterion, SamplingMode, Throughput};
use futures::{stream, SinkExt, StreamExt};
use tempfile::tempdir;
Expand Down Expand Up @@ -54,10 +55,7 @@ fn benchmark_files_no_partitions(c: &mut Criterion) {
sinks::file::FileSinkConfig {
path: output.try_into().unwrap(),
idle_timeout_secs: None,
encoding: sinks::util::encoding::EncodingConfig::from(
sinks::file::Encoding::Text,
)
.into(),
encoding: (None::<FramingConfig>, TextSerializerConfig::new()).into(),
compression: sinks::file::Compression::None,
acknowledgements: Default::default(),
},
Expand Down
8 changes: 4 additions & 4 deletions benches/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::net::SocketAddr;

use codecs::{encoding::FramingConfig, TextSerializerConfig};
use criterion::{criterion_group, BatchSize, BenchmarkId, Criterion, SamplingMode, Throughput};
use futures::TryFutureExt;
use hyper::{
Expand All @@ -9,7 +10,7 @@ use hyper::{
use tokio::runtime::Runtime;
use vector::{
config, sinks,
sinks::util::{encoding::EncodingConfigWithFramingAdapter, BatchConfig, Compression},
sinks::util::{BatchConfig, Compression},
sources,
test_util::{next_addr, random_lines, runtime, send_lines, start_topology, wait_for_tcp},
Error,
Expand Down Expand Up @@ -53,9 +54,8 @@ fn benchmark_http(c: &mut Criterion) {
auth: Default::default(),
headers: Default::default(),
batch,
encoding: EncodingConfigWithFramingAdapter::legacy(
sinks::http::Encoding::Text.into(),
),
encoding: (None::<FramingConfig>, TextSerializerConfig::new())
.into(),
request: Default::default(),
tls: Default::default(),
acknowledgements: Default::default(),
Expand Down
17 changes: 9 additions & 8 deletions config/examples/docs_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ index = "vector-%Y-%m-%d" # daily indices

# Send structured data to a cost-effective long-term storage
[sinks.s3_archives]
inputs = ["apache_parser"] # don't sample for S3
type = "aws_s3"
region = "us-east-1"
bucket = "my-log-archives"
key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format
compression = "gzip" # compress final objects
encoding = "ndjson" # new line delimited JSON
inputs = ["apache_parser"] # don't sample for S3
type = "aws_s3"
region = "us-east-1"
bucket = "my-log-archives"
key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format
compression = "gzip" # compress final objects
framing.method = "newline_delimited" # new line delimited...
encoding.codec = "json" # ...JSON
[sinks.s3_archives.batch]
max_bytes = 10000000 # 10mb uncompressed
max_bytes = 10000000 # 10mb uncompressed
6 changes: 3 additions & 3 deletions config/examples/environment_variables.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ data_dir = "/var/lib/vector"
# Print the data to STDOUT for inspection
# Docs: https://vector.dev/docs/reference/sinks/console
[sinks.out]
inputs = ["add_host"]
type = "console"
encoding = "json"
inputs = ["add_host"]
type = "console"
encoding.codec = "json"
15 changes: 8 additions & 7 deletions config/examples/es_s3_hybrid.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ data_dir = "/var/lib/vector"

# Send structured data to S3, a durable long-term storage
[sinks.s3_archives]
inputs = ["apache_logs"] # don't sample
type = "aws_s3"
region = "us-east-1"
bucket = "my_log_archives"
encoding = "ndjson"
compression = "gzip"
inputs = ["apache_logs"] # don't sample
type = "aws_s3"
region = "us-east-1"
bucket = "my_log_archives"
framing.method = "newline_delimited"
encoding.codec = "json"
compression = "gzip"
[sinks.s3_archives.batch]
max_size = 10000000 # 10mb uncompressed
max_size = 10000000 # 10mb uncompressed
4 changes: 2 additions & 2 deletions config/examples/file_to_cloudwatch_metrics.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ tags = {method = "{{method}}", status = "{{status}}"}
[sinks.console_metrics]
inputs = ["log_to_metric"]
type = "console"
encoding = "json"
encoding.codec = "json"

[sinks.console_logs]
inputs = ["remap"]
type = "console"
encoding = "json"
encoding.codec = "json"

[sinks.cloudwatch]
inputs = ["log_to_metric"]
Expand Down
4 changes: 2 additions & 2 deletions config/examples/file_to_prometheus.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ name = "bytes_out_histogram"
[sinks.console_metrics]
inputs = ["log_to_metric"]
type = "console"
encoding = "json"
encoding.codec = "json"

[sinks.console_logs]
inputs = ["remap"]
type = "console"
encoding = "text"
encoding.codec = "text"

[sinks.prometheus]
inputs = ["log_to_metric"]
Expand Down
17 changes: 9 additions & 8 deletions config/examples/namespacing/sinks/s3_archives.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Send structured data to a cost-effective long-term storage
inputs = ["apache_parser"] # don't sample for S3
type = "aws_s3"
region = "us-east-1"
bucket = "my-log-archives"
key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format
compression = "gzip" # compress final objects
encoding = "ndjson" # new line delimited JSON
inputs = ["apache_parser"] # don't sample for S3
type = "aws_s3"
region = "us-east-1"
bucket = "my-log-archives"
key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format
compression = "gzip" # compress final objects
framing.method = "newline_delimited" # new line delimited...
encoding.codec = "json" # ...JSON
[batch]
max_bytes = 10000000 # 10mb uncompressed
max_bytes = 10000000 # 10mb uncompressed
2 changes: 1 addition & 1 deletion config/examples/prometheus_to_console.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ scrape_interval_secs = 2
[sinks.console]
inputs = ["prometheus"]
type = "console"
encoding = "json"
encoding.codec = "json"
2 changes: 1 addition & 1 deletion config/examples/stdio.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
[sinks.out]
inputs = ["in"]
type = "console"
encoding = "text"
encoding.codec = "text"
6 changes: 3 additions & 3 deletions config/examples/wrapped_json.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ data_dir = "/var/lib/vector"
# Print the data to STDOUT for inspection
# Docs: https://vector.dev/docs/reference/sinks/console
[sinks.out]
inputs = ["parse_json"]
type = "console"
encoding = "json"
inputs = ["parse_json"]
type = "console"
encoding.codec = "json"
2 changes: 1 addition & 1 deletion docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ type = "stdin"
[sinks.bar]
type = "console"
inputs = ["foo"]
encoding = "json"
encoding.codec = "json"
```

After the component construction phase, we'll be left with the tasks for each
Expand Down
86 changes: 82 additions & 4 deletions lib/codecs/src/encoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,100 @@ impl Encoder<Event> for JsonSerializer {
#[cfg(test)]
mod tests {
use bytes::BytesMut;
use chrono::{TimeZone, Utc};
use vector_common::btreemap;
use vector_core::event::{LogEvent, Value};
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};

use super::*;

#[test]
fn serialize_json() {
fn serialize_json_log() {
let event = Event::Log(LogEvent::from(btreemap! {
"foo" => Value::from("bar")
"x" => Value::from("23"),
"z" => Value::from(25),
"a" => Value::from("0"),
}));
let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(bytes.freeze(), r#"{"foo":"bar"}"#);
assert_eq!(bytes.freeze(), r#"{"a":"0","x":"23","z":25}"#);
}

#[test]
fn serialize_json_metric_counter() {
let event = Event::Metric(
Metric::new(
"foos",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_namespace(Some("vector"))
.with_tags(Some(
vec![
("key2".to_owned(), "value2".to_owned()),
("key1".to_owned(), "value1".to_owned()),
("Key3".to_owned(), "Value3".to_owned()),
]
.into_iter()
.collect(),
))
.with_timestamp(Some(Utc.ymd(2018, 11, 14).and_hms_nano(8, 9, 10, 11))),
);

let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"#
);
}

#[test]
fn serialize_json_metric_set() {
let event = Event::Metric(Metric::new(
"users",
MetricKind::Incremental,
MetricValue::Set {
values: vec!["bob".into()].into_iter().collect(),
},
));

let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"#
);
}

#[test]
fn serialize_json_metric_histogram_without_timestamp() {
let event = Event::Metric(Metric::new(
"glork",
MetricKind::Incremental,
MetricValue::Distribution {
samples: vector_core::samples![10.0 => 1],
statistic: StatisticKind::Histogram,
},
));

let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"#
);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion lib/k8s-e2e-tests/tests/vector-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const CUSTOM_RESOURCE_VECTOR_CONFIG: &str = indoc! {r#"
[sinks.stdout]
type = "console"
inputs = ["kubernetes_logs"]
encoding = "json"
encoding.codec = "json"
"#};

/// This test validates that vector picks up logs at the simplest case
Expand Down
2 changes: 1 addition & 1 deletion rfcs/2020-03-06-1999-api-extensions-for-lua-transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ Here `event` is an encoded event to be produced by the transform, and `lane` is
> [sinks.example_console]
> type = "console"
> inputs = ["example_transform.example_lane"] # would output the event from `example_lane`
> encoding = "text"
> encoding.codec = "text"
> ```
>
> Other components connected to the same transform, but with different lanes names or without lane names at all would not receive any event.
Expand Down
2 changes: 1 addition & 1 deletion rfcs/2020-04-15-2341-wasm-plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ module = "target/wasm32-wasi/release/echo.wasm"
healthcheck = true
inputs = ["demo"]
type = "console"
encoding = "json"
encoding.codec = "json"
buffer.type = "memory"
buffer.max_events = 500
buffer.when_full = "block"
Expand Down
2 changes: 1 addition & 1 deletion rfcs/2021-07-20-8288-csv-enrichment.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ To represent the CSV file we have a new top level configuration option.
```toml
[enrichment_tables.csv_file]
type = "file"
encoding = "csv"
encoding.codec = "csv"
path = "\path_to_csv"
delimiter = ","
```
Expand Down
2 changes: 1 addition & 1 deletion skaffold/manifests/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ data:
type = "console"
inputs = ["kubernetes_logs", "internal_metrics"]
target = "stdout"
encoding = "json"
encoding.codec = "json"
2 changes: 1 addition & 1 deletion soaks/disabled-tests/fluent_remap_aws_firehose/vector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ stream_name = "soak_fluent_remap_firehose"
endpoint = "http://localhost:8080"
healthcheck.enabled = true
compression = "none"
encoding = "json"
encoding.codec = "json"
region = "us-east-2"
auth.access_key_id = "totallyanaccesskeyid"
auth.secret_access_key = "alsoasecretaccesskey"
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ type = "internal_metrics"
[sources.logs]
type = "http"
address = "0.0.0.0:8282"
encoding = "ndjson"
framing.method = "newline_delimited"
decoding.codec = "json"

##
## Transforms
Expand Down
2 changes: 1 addition & 1 deletion soaks/tests/http_pipelines_blackhole/vector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type = "internal_metrics"
[sources.logs]
type = "http"
address = "0.0.0.0:8282"
encoding = "text"
decoding.codec = "bytes"

##
## Transforms
Expand Down
2 changes: 1 addition & 1 deletion soaks/tests/http_pipelines_blackhole_acks/vector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type = "internal_metrics"
[sources.logs]
type = "http"
address = "0.0.0.0:8282"
encoding = "text"
decoding.codec = "bytes"
acknowledgements = true

##
Expand Down
2 changes: 1 addition & 1 deletion soaks/tests/http_pipelines_no_grok_blackhole/vector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type = "internal_metrics"
[sources.logs]
type = "http"
address = "0.0.0.0:8282"
encoding = "text"
decoding.codec = "bytes"

##
## Transforms
Expand Down
Loading

0 comments on commit 9a9340a

Please sign in to comment.