Skip to content

Commit

Permalink
Merge branch 'main' into mgattozzi/distributables
Browse files Browse the repository at this point in the history
  • Loading branch information
mgattozzi authored Mar 11, 2024
2 parents 9a47df9 + bf93197 commit d290280
Show file tree
Hide file tree
Showing 13 changed files with 1,294 additions and 718 deletions.
35 changes: 18 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 24 additions & 4 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use influxdb3_server::{
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::wal::WalImpl;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::SegmentDuration;
use iox_query::exec::{Executor, ExecutorConfig};
use iox_time::SystemProvider;
use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
Expand Down Expand Up @@ -133,6 +135,16 @@ pub struct Config {
/// bearer token to be set for requests
#[clap(long = "bearer-token", env = "INFLUXDB3_BEARER_TOKEN", action)]
pub bearer_token: Option<String>,

/// Duration of wal segments that are persisted to object storage. Valid values: 1m, 5m, 10m,
/// 15m, 30m, 1h, 2h, 4h.
#[clap(
long = "segment-duration",
env = "INFLUXDB3_SEGMENT_DURATION",
default_value = "1h",
action
)]
pub segment_duration: SegmentDuration,
}

#[cfg(any(
Expand Down Expand Up @@ -257,8 +269,17 @@ pub async fn command(config: Config) -> Result<()> {
.wal_directory
.map(|dir| WalImpl::new(dir).map(Arc::new))
.transpose()?;
// TODO: the next segment ID should be loaded from the persister
let write_buffer = Arc::new(WriteBufferImpl::new(Arc::clone(&persister), wal).await?);

let time_provider = Arc::new(SystemProvider::new());
let write_buffer = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
wal,
Arc::clone(&time_provider),
config.segment_duration,
)
.await?,
);
let query_executor = Arc::new(QueryExecutorImpl::new(
write_buffer.catalog(),
Arc::clone(&write_buffer),
Expand All @@ -268,12 +289,11 @@ pub async fn command(config: Config) -> Result<()> {
10,
));

let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));

let builder = ServerBuilder::new(common_state)
.max_request_size(config.max_http_request_size)
.write_buffer(write_buffer)
.query_executor(query_executor)
.time_provider(time_provider)
.persister(persister);

let server = if let Some(token) = config.bearer_token.map(hex::decode).transpose()? {
Expand Down
48 changes: 37 additions & 11 deletions influxdb3_server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ use authz::Authorizer;
use crate::{auth::DefaultAuthorizer, http::HttpApi, CommonServerState, Server};

#[derive(Debug)]
pub struct ServerBuilder<W, Q, P> {
pub struct ServerBuilder<W, Q, P, T> {
common_state: CommonServerState,
time_provider: T,
max_request_size: usize,
write_buffer: W,
query_executor: Q,
persister: P,
authorizer: Arc<dyn Authorizer>,
}

impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister> {
impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister, NoTimeProvider> {
pub fn new(common_state: CommonServerState) -> Self {
Self {
common_state,
time_provider: NoTimeProvider,
max_request_size: usize::MAX,
write_buffer: NoWriteBuf,
query_executor: NoQueryExec,
Expand All @@ -27,7 +29,7 @@ impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister> {
}
}

impl<W, Q, P> ServerBuilder<W, Q, P> {
impl<W, Q, P, T> ServerBuilder<W, Q, P, T> {
pub fn max_request_size(mut self, max_request_size: usize) -> Self {
self.max_request_size = max_request_size;
self
Expand All @@ -51,11 +53,16 @@ pub struct WithQueryExec<Q>(Arc<Q>);
pub struct NoPersister;
#[derive(Debug)]
pub struct WithPersister<P>(Arc<P>);
#[derive(Debug)]
pub struct NoTimeProvider;
#[derive(Debug)]
pub struct WithTimeProvider<T>(Arc<T>);

impl<Q, P> ServerBuilder<NoWriteBuf, Q, P> {
pub fn write_buffer<W>(self, wb: Arc<W>) -> ServerBuilder<WithWriteBuf<W>, Q, P> {
impl<Q, P, T> ServerBuilder<NoWriteBuf, Q, P, T> {
pub fn write_buffer<W>(self, wb: Arc<W>) -> ServerBuilder<WithWriteBuf<W>, Q, P, T> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
max_request_size: self.max_request_size,
write_buffer: WithWriteBuf(wb),
query_executor: self.query_executor,
Expand All @@ -65,10 +72,11 @@ impl<Q, P> ServerBuilder<NoWriteBuf, Q, P> {
}
}

impl<W, P> ServerBuilder<W, NoQueryExec, P> {
pub fn query_executor<Q>(self, qe: Arc<Q>) -> ServerBuilder<W, WithQueryExec<Q>, P> {
impl<W, P, T> ServerBuilder<W, NoQueryExec, P, T> {
pub fn query_executor<Q>(self, qe: Arc<Q>) -> ServerBuilder<W, WithQueryExec<Q>, P, T> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
max_request_size: self.max_request_size,
write_buffer: self.write_buffer,
query_executor: WithQueryExec(qe),
Expand All @@ -78,10 +86,11 @@ impl<W, P> ServerBuilder<W, NoQueryExec, P> {
}
}

impl<W, Q> ServerBuilder<W, Q, NoPersister> {
pub fn persister<P>(self, p: Arc<P>) -> ServerBuilder<W, Q, WithPersister<P>> {
impl<W, Q, T> ServerBuilder<W, Q, NoPersister, T> {
pub fn persister<P>(self, p: Arc<P>) -> ServerBuilder<W, Q, WithPersister<P>, T> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
max_request_size: self.max_request_size,
write_buffer: self.write_buffer,
query_executor: self.query_executor,
Expand All @@ -91,12 +100,29 @@ impl<W, Q> ServerBuilder<W, Q, NoPersister> {
}
}

impl<W, Q, P> ServerBuilder<WithWriteBuf<W>, WithQueryExec<Q>, WithPersister<P>> {
pub fn build(self) -> Server<W, Q, P> {
impl<W, Q, P> ServerBuilder<W, Q, P, NoTimeProvider> {
pub fn time_provider<T>(self, tp: Arc<T>) -> ServerBuilder<W, Q, P, WithTimeProvider<T>> {
ServerBuilder {
common_state: self.common_state,
time_provider: WithTimeProvider(tp),
max_request_size: self.max_request_size,
write_buffer: self.write_buffer,
query_executor: self.query_executor,
persister: self.persister,
authorizer: self.authorizer,
}
}
}

impl<W, Q, P, T>
ServerBuilder<WithWriteBuf<W>, WithQueryExec<Q>, WithPersister<P>, WithTimeProvider<T>>
{
pub fn build(self) -> Server<W, Q, P, T> {
let persister = Arc::clone(&self.persister.0);
let authorizer = Arc::clone(&self.authorizer);
let http = Arc::new(HttpApi::new(
self.common_state.clone(),
Arc::clone(&self.time_provider.0),
Arc::clone(&self.write_buffer.0),
Arc::clone(&self.query_executor.0),
self.max_request_size,
Expand Down
19 changes: 11 additions & 8 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use influxdb3_write::BufferedWriteRequest;
use influxdb3_write::Precision;
use influxdb3_write::WriteBuffer;
use iox_query_influxql_rewrite as rewrite;
use iox_time::{SystemProvider, TimeProvider};
use iox_time::TimeProvider;
use observability_deps::tracing::{debug, error, info};
use serde::de::DeserializeOwned;
use serde::Deserialize;
Expand Down Expand Up @@ -273,24 +273,27 @@ impl Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug)]
pub(crate) struct HttpApi<W, Q> {
pub(crate) struct HttpApi<W, Q, T> {
common_state: CommonServerState,
write_buffer: Arc<W>,
time_provider: Arc<T>,
pub(crate) query_executor: Arc<Q>,
max_request_bytes: usize,
authorizer: Arc<dyn Authorizer>,
}

impl<W, Q> HttpApi<W, Q> {
impl<W, Q, T> HttpApi<W, Q, T> {
pub(crate) fn new(
common_state: CommonServerState,
time_provider: Arc<T>,
write_buffer: Arc<W>,
query_executor: Arc<Q>,
max_request_bytes: usize,
authorizer: Arc<dyn Authorizer>,
) -> Self {
Self {
common_state,
time_provider,
write_buffer,
query_executor,
max_request_bytes,
Expand All @@ -299,10 +302,11 @@ impl<W, Q> HttpApi<W, Q> {
}
}

impl<W, Q> HttpApi<W, Q>
impl<W, Q, T> HttpApi<W, Q, T>
where
W: WriteBuffer,
Q: QueryExecutor,
T: TimeProvider,
Error: From<<Q as QueryExecutor>::Error>,
{
async fn write_lp(&self, req: Request<Body>) -> Result<Response<Body>> {
Expand All @@ -316,8 +320,7 @@ where

let database = NamespaceName::new(params.db)?;

// TODO: use the time provider
let default_time = SystemProvider::new().now().timestamp_nanos();
let default_time = self.time_provider.now();

let result = self
.write_buffer
Expand Down Expand Up @@ -704,8 +707,8 @@ pub(crate) struct WriteParams {
pub(crate) precision: Precision,
}

pub(crate) async fn route_request<W, Q>(
http_server: Arc<HttpApi<W, Q>>,
pub(crate) async fn route_request<W: WriteBuffer, Q: QueryExecutor, T: TimeProvider>(
http_server: Arc<HttpApi<W, Q, T>>,
mut req: Request<Body>,
) -> Result<Response<Body>, Infallible>
where
Expand Down
Loading

0 comments on commit d290280

Please sign in to comment.