diff --git a/src/bytes.rs b/src/bytes.rs index 0404a72db..de762f2c9 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -1,19 +1,16 @@ +use core::any::TypeId; use core::iter::FromIterator; use core::ops::{Deref, RangeBounds}; use core::{cmp, fmt, hash, mem, ptr, slice, usize}; -use alloc::{ - alloc::{dealloc, Layout}, - borrow::Borrow, - boxed::Box, - string::String, - vec::Vec, -}; +use alloc::{borrow::Borrow, boxed::Box, string::String, vec::Vec}; use crate::buf::IntoIter; +use crate::impls::*; #[allow(unused)] use crate::loom::sync::atomic::AtomicMut; -use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; +use crate::shared_buf::{BufferParts, SharedBuf}; use crate::Buf; /// A cheaply cloneable and sliceable chunk of contiguous memory. @@ -105,15 +102,18 @@ pub struct Bytes { vtable: &'static Vtable, } -pub(crate) struct Vtable { +struct Vtable { + type_id: fn() -> TypeId, /// fn(data, ptr, len) - pub clone: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Bytes, + clone: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> BufferParts, + /// Called during `Bytes::try_resize` and `Bytes::truncate` + try_resize: unsafe fn(&mut AtomicPtr<()>, *const u8, usize), /// fn(data, ptr, len) /// - /// takes `Bytes` to value - pub to_vec: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Vec, + /// Consumes `Bytes` and return `Vec` + into_vec: unsafe fn(&mut AtomicPtr<()>, *const u8, usize) -> Vec, /// fn(data, ptr, len) - pub drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize), + drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize), } impl Bytes { @@ -159,7 +159,15 @@ impl Bytes { /// ``` #[inline] #[cfg(not(all(loom, test)))] - pub const fn from_static(bytes: &'static [u8]) -> Self { + pub const fn from_static(bytes: &'static [u8]) -> Bytes { + const STATIC_VTABLE: Vtable = Vtable { + type_id: TypeId::of::, + clone: ::clone, + try_resize: ::try_resize, + into_vec: ::into_vec, + drop: ::drop, + }; + Bytes { ptr: bytes.as_ptr(), len: bytes.len(), @@ -169,7 +177,15 @@ impl Bytes { } #[cfg(all(loom, test))] - pub fn from_static(bytes: &'static [u8]) -> Self { + pub fn from_static(bytes: &'static [u8]) -> Bytes { + const STATIC_VTABLE: Vtable = Vtable { + type_id: TypeId::of::, + clone: ::clone, + try_resize: ::try_resize, + into_vec: ::into_vec, + drop: ::drop, + }; + Bytes { ptr: bytes.as_ptr(), len: bytes.len(), @@ -178,6 +194,28 @@ impl Bytes { } } + /// Creates a new `Bytes` instance using an impl of [`SharedBuf`] as the internal buffer. + /// + /// This takes an impl of `SharedBuf`, and wraps it in a `Bytes` instance. + /// This can be reversed with the [`into_shared_buf`] method. + #[inline] + pub fn from_shared_buf(buf_impl: T) -> Bytes { + let (data, ptr, len) = SharedBuf::into_parts(buf_impl); + + Bytes { + ptr, + len, + data, + vtable: &Vtable { + type_id: TypeId::of::, + clone: T::clone, + try_resize: T::try_resize, + into_vec: T::into_vec, + drop: T::drop, + }, + } + } + /// Returns the number of bytes contained in this `Bytes`. /// /// # Examples @@ -455,16 +493,10 @@ impl Bytes { #[inline] pub fn truncate(&mut self, len: usize) { if len < self.len { - // The Vec "promotable" vtables do not store the capacity, - // so we cannot truncate while using this repr. We *have* to - // promote using `split_off` so the capacity can be stored. - if self.vtable as *const Vtable == &PROMOTABLE_EVEN_VTABLE - || self.vtable as *const Vtable == &PROMOTABLE_ODD_VTABLE - { - drop(self.split_off(len)); - } else { - self.len = len; + unsafe { + (self.vtable.try_resize)(&mut self.data, self.ptr, self.len); } + self.len = len; } } @@ -484,18 +516,24 @@ impl Bytes { self.truncate(0); } + /// Downcast this `Bytes` into its underlying implementation. + /// + /// The target type, T, must match the type that was originally used + /// to construct this `Bytes` instance. A runtime check is used + /// to validate this. + /// + /// On success, T is returned. + /// + /// On failure, self is returned as an `Err` #[inline] - pub(crate) unsafe fn with_vtable( - ptr: *const u8, - len: usize, - data: AtomicPtr<()>, - vtable: &'static Vtable, - ) -> Bytes { - Bytes { - ptr, - len, - data, - vtable, + pub fn into_shared_buf(self) -> Result { + if TypeId::of::() == (self.vtable.type_id)() { + Ok(unsafe { + let this = &mut *mem::ManuallyDrop::new(self); + T::from_parts(&mut this.data, this.ptr, this.len) + }) + } else { + Err(self) } } @@ -529,7 +567,13 @@ impl Drop for Bytes { impl Clone for Bytes { #[inline] fn clone(&self) -> Bytes { - unsafe { (self.vtable.clone)(&self.data, self.ptr, self.len) } + let (data, ptr, len) = unsafe { (self.vtable.clone)(&self.data, self.ptr, self.len) }; + Bytes { + ptr, + len, + data, + vtable: self.vtable, + } } } @@ -817,26 +861,14 @@ impl From> for Bytes { return Bytes::from(vec.into_boxed_slice()); } - let shared = Box::new(Shared { + let shared = Box::new(crate::impls::shared::Shared { buf: ptr, cap, ref_cnt: AtomicUsize::new(1), }); mem::forget(vec); - - let shared = Box::into_raw(shared); - // The pointer should be aligned, so this assert should - // always succeed. - debug_assert!( - 0 == (shared as usize & KIND_MASK), - "internal: Box should have an aligned pointer", - ); - Bytes { - ptr, - len, - data: AtomicPtr::new(shared as _), - vtable: &SHARED_VTABLE, - } + let imp = crate::impls::shared::SharedImpl::new(Box::into_raw(shared), ptr, len); + Bytes::from_shared_buf(imp) } } @@ -849,24 +881,14 @@ impl From> for Bytes { return Bytes::new(); } - let len = slice.len(); - let ptr = Box::into_raw(slice) as *mut u8; - - if ptr as usize & 0x1 == 0 { - let data = ptr_map(ptr, |addr| addr | KIND_VEC); - Bytes { - ptr, - len, - data: AtomicPtr::new(data.cast()), - vtable: &PROMOTABLE_EVEN_VTABLE, - } + if slice.as_ptr() as usize & 0x1 == 0 { + Bytes::from_shared_buf(promotable::PromotableEvenImpl( + promotable::Promotable::Owned(slice), + )) } else { - Bytes { - ptr, - len, - data: AtomicPtr::new(ptr.cast()), - vtable: &PROMOTABLE_ODD_VTABLE, - } + Bytes::from_shared_buf(promotable::PromotableOddImpl( + promotable::Promotable::Owned(slice), + )) } } } @@ -879,8 +901,8 @@ impl From for Bytes { impl From for Vec { fn from(bytes: Bytes) -> Vec { - let bytes = mem::ManuallyDrop::new(bytes); - unsafe { (bytes.vtable.to_vec)(&bytes.data, bytes.ptr, bytes.len) } + let bytes = &mut *mem::ManuallyDrop::new(bytes); + unsafe { (bytes.vtable.into_vec)(&mut bytes.data, bytes.ptr, bytes.len) } } } @@ -889,365 +911,15 @@ impl From for Vec { impl fmt::Debug for Vtable { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Vtable") + .field("type_id", &self.type_id) .field("clone", &(self.clone as *const ())) + .field("try_resize", &(self.try_resize as *const ())) + .field("into_vec", &(self.into_vec as *const ())) .field("drop", &(self.drop as *const ())) .finish() } } -// ===== impl StaticVtable ===== - -const STATIC_VTABLE: Vtable = Vtable { - clone: static_clone, - to_vec: static_to_vec, - drop: static_drop, -}; - -unsafe fn static_clone(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { - let slice = slice::from_raw_parts(ptr, len); - Bytes::from_static(slice) -} - -unsafe fn static_to_vec(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { - let slice = slice::from_raw_parts(ptr, len); - slice.to_vec() -} - -unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) { - // nothing to drop for &'static [u8] -} - -// ===== impl PromotableVtable ===== - -static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable { - clone: promotable_even_clone, - to_vec: promotable_even_to_vec, - drop: promotable_even_drop, -}; - -static PROMOTABLE_ODD_VTABLE: Vtable = Vtable { - clone: promotable_odd_clone, - to_vec: promotable_odd_to_vec, - drop: promotable_odd_drop, -}; - -unsafe fn promotable_even_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { - let shared = data.load(Ordering::Acquire); - let kind = shared as usize & KIND_MASK; - - if kind == KIND_ARC { - shallow_clone_arc(shared.cast(), ptr, len) - } else { - debug_assert_eq!(kind, KIND_VEC); - let buf = ptr_map(shared.cast(), |addr| addr & !KIND_MASK); - shallow_clone_vec(data, shared, buf, ptr, len) - } -} - -unsafe fn promotable_to_vec( - data: &AtomicPtr<()>, - ptr: *const u8, - len: usize, - f: fn(*mut ()) -> *mut u8, -) -> Vec { - let shared = data.load(Ordering::Acquire); - let kind = shared as usize & KIND_MASK; - - if kind == KIND_ARC { - shared_to_vec_impl(shared.cast(), ptr, len) - } else { - // If Bytes holds a Vec, then the offset must be 0. - debug_assert_eq!(kind, KIND_VEC); - - let buf = f(shared); - - let cap = (ptr as usize - buf as usize) + len; - - // Copy back buffer - ptr::copy(ptr, buf, len); - - Vec::from_raw_parts(buf, len, cap) - } -} - -unsafe fn promotable_even_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { - promotable_to_vec(data, ptr, len, |shared| { - ptr_map(shared.cast(), |addr| addr & !KIND_MASK) - }) -} - -unsafe fn promotable_even_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) { - data.with_mut(|shared| { - let shared = *shared; - let kind = shared as usize & KIND_MASK; - - if kind == KIND_ARC { - release_shared(shared.cast()); - } else { - debug_assert_eq!(kind, KIND_VEC); - let buf = ptr_map(shared.cast(), |addr| addr & !KIND_MASK); - free_boxed_slice(buf, ptr, len); - } - }); -} - -unsafe fn promotable_odd_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { - let shared = data.load(Ordering::Acquire); - let kind = shared as usize & KIND_MASK; - - if kind == KIND_ARC { - shallow_clone_arc(shared as _, ptr, len) - } else { - debug_assert_eq!(kind, KIND_VEC); - shallow_clone_vec(data, shared, shared.cast(), ptr, len) - } -} - -unsafe fn promotable_odd_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { - promotable_to_vec(data, ptr, len, |shared| shared.cast()) -} - -unsafe fn promotable_odd_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) { - data.with_mut(|shared| { - let shared = *shared; - let kind = shared as usize & KIND_MASK; - - if kind == KIND_ARC { - release_shared(shared.cast()); - } else { - debug_assert_eq!(kind, KIND_VEC); - - free_boxed_slice(shared.cast(), ptr, len); - } - }); -} - -unsafe fn free_boxed_slice(buf: *mut u8, offset: *const u8, len: usize) { - let cap = (offset as usize - buf as usize) + len; - dealloc(buf, Layout::from_size_align(cap, 1).unwrap()) -} - -// ===== impl SharedVtable ===== - -struct Shared { - // Holds arguments to dealloc upon Drop, but otherwise doesn't use them - buf: *mut u8, - cap: usize, - ref_cnt: AtomicUsize, -} - -impl Drop for Shared { - fn drop(&mut self) { - unsafe { dealloc(self.buf, Layout::from_size_align(self.cap, 1).unwrap()) } - } -} - -// Assert that the alignment of `Shared` is divisible by 2. -// This is a necessary invariant since we depend on allocating `Shared` a -// shared object to implicitly carry the `KIND_ARC` flag in its pointer. -// This flag is set when the LSB is 0. -const _: [(); 0 - mem::align_of::() % 2] = []; // Assert that the alignment of `Shared` is divisible by 2. - -static SHARED_VTABLE: Vtable = Vtable { - clone: shared_clone, - to_vec: shared_to_vec, - drop: shared_drop, -}; - -const KIND_ARC: usize = 0b0; -const KIND_VEC: usize = 0b1; -const KIND_MASK: usize = 0b1; - -unsafe fn shared_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { - let shared = data.load(Ordering::Relaxed); - shallow_clone_arc(shared as _, ptr, len) -} - -unsafe fn shared_to_vec_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> Vec { - // Check that the ref_cnt is 1 (unique). - // - // If it is unique, then it is set to 0 with AcqRel fence for the same - // reason in release_shared. - // - // Otherwise, we take the other branch and call release_shared. - if (*shared) - .ref_cnt - .compare_exchange(1, 0, Ordering::AcqRel, Ordering::Relaxed) - .is_ok() - { - let buf = (*shared).buf; - let cap = (*shared).cap; - - // Deallocate Shared - drop(Box::from_raw(shared as *mut mem::ManuallyDrop)); - - // Copy back buffer - ptr::copy(ptr, buf, len); - - Vec::from_raw_parts(buf, len, cap) - } else { - let v = slice::from_raw_parts(ptr, len).to_vec(); - release_shared(shared); - v - } -} - -unsafe fn shared_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { - shared_to_vec_impl(data.load(Ordering::Relaxed).cast(), ptr, len) -} - -unsafe fn shared_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) { - data.with_mut(|shared| { - release_shared(shared.cast()); - }); -} - -unsafe fn shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) -> Bytes { - let old_size = (*shared).ref_cnt.fetch_add(1, Ordering::Relaxed); - - if old_size > usize::MAX >> 1 { - crate::abort(); - } - - Bytes { - ptr, - len, - data: AtomicPtr::new(shared as _), - vtable: &SHARED_VTABLE, - } -} - -#[cold] -unsafe fn shallow_clone_vec( - atom: &AtomicPtr<()>, - ptr: *const (), - buf: *mut u8, - offset: *const u8, - len: usize, -) -> Bytes { - // If the buffer is still tracked in a `Vec`. It is time to - // promote the vec to an `Arc`. This could potentially be called - // concurrently, so some care must be taken. - - // First, allocate a new `Shared` instance containing the - // `Vec` fields. It's important to note that `ptr`, `len`, - // and `cap` cannot be mutated without having `&mut self`. - // This means that these fields will not be concurrently - // updated and since the buffer hasn't been promoted to an - // `Arc`, those three fields still are the components of the - // vector. - let shared = Box::new(Shared { - buf, - cap: (offset as usize - buf as usize) + len, - // Initialize refcount to 2. One for this reference, and one - // for the new clone that will be returned from - // `shallow_clone`. - ref_cnt: AtomicUsize::new(2), - }); - - let shared = Box::into_raw(shared); - - // The pointer should be aligned, so this assert should - // always succeed. - debug_assert!( - 0 == (shared as usize & KIND_MASK), - "internal: Box should have an aligned pointer", - ); - - // Try compare & swapping the pointer into the `arc` field. - // `Release` is used synchronize with other threads that - // will load the `arc` field. - // - // If the `compare_exchange` fails, then the thread lost the - // race to promote the buffer to shared. The `Acquire` - // ordering will synchronize with the `compare_exchange` - // that happened in the other thread and the `Shared` - // pointed to by `actual` will be visible. - match atom.compare_exchange(ptr as _, shared as _, Ordering::AcqRel, Ordering::Acquire) { - Ok(actual) => { - debug_assert!(actual as usize == ptr as usize); - // The upgrade was successful, the new handle can be - // returned. - Bytes { - ptr: offset, - len, - data: AtomicPtr::new(shared as _), - vtable: &SHARED_VTABLE, - } - } - Err(actual) => { - // The upgrade failed, a concurrent clone happened. Release - // the allocation that was made in this thread, it will not - // be needed. - let shared = Box::from_raw(shared); - mem::forget(*shared); - - // Buffer already promoted to shared storage, so increment ref - // count. - shallow_clone_arc(actual as _, offset, len) - } - } -} - -unsafe fn release_shared(ptr: *mut Shared) { - // `Shared` storage... follow the drop steps from Arc. - if (*ptr).ref_cnt.fetch_sub(1, Ordering::Release) != 1 { - return; - } - - // This fence is needed to prevent reordering of use of the data and - // deletion of the data. Because it is marked `Release`, the decreasing - // of the reference count synchronizes with this `Acquire` fence. This - // means that use of the data happens before decreasing the reference - // count, which happens before this fence, which happens before the - // deletion of the data. - // - // As explained in the [Boost documentation][1], - // - // > It is important to enforce any possible access to the object in one - // > thread (through an existing reference) to *happen before* deleting - // > the object in a different thread. This is achieved by a "release" - // > operation after dropping a reference (any access to the object - // > through this reference must obviously happened before), and an - // > "acquire" operation before deleting the object. - // - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - // - // Thread sanitizer does not support atomic fences. Use an atomic load - // instead. - (*ptr).ref_cnt.load(Ordering::Acquire); - - // Drop the data - drop(Box::from_raw(ptr)); -} - -// Ideally we would always use this version of `ptr_map` since it is strict -// provenance compatible, but it results in worse codegen. We will however still -// use it on miri because it gives better diagnostics for people who test bytes -// code with miri. -// -// See https://github.com/tokio-rs/bytes/pull/545 for more info. -#[cfg(miri)] -fn ptr_map(ptr: *mut u8, f: F) -> *mut u8 -where - F: FnOnce(usize) -> usize, -{ - let old_addr = ptr as usize; - let new_addr = f(old_addr); - let diff = new_addr.wrapping_sub(old_addr); - ptr.wrapping_add(diff) -} - -#[cfg(not(miri))] -fn ptr_map(ptr: *mut u8, f: F) -> *mut u8 -where - F: FnOnce(usize) -> usize, -{ - let old_addr = ptr as usize; - let new_addr = f(old_addr); - new_addr as *mut u8 -} - // compile-fails /// ```compile_fail diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index 450b93279..b1470d0b0 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -13,10 +13,10 @@ use alloc::{ }; use crate::buf::{IntoIter, UninitSlice}; -use crate::bytes::Vtable; #[allow(unused)] use crate::loom::sync::atomic::AtomicMut; use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use crate::shared_buf::{BufferParts, SharedBuf}; use crate::{Buf, BufMut, Bytes}; /// A unique reference to a contiguous slice of memory. @@ -253,9 +253,9 @@ impl BytesMut { let ptr = self.ptr.as_ptr(); let len = self.len; - let data = AtomicPtr::new(self.data.cast()); + let shared = self.data; mem::forget(self); - unsafe { Bytes::with_vtable(ptr, len, data, &SHARED_VTABLE) } + Bytes::from_shared_buf(SharedImpl { shared, ptr, len }) } } @@ -1703,46 +1703,59 @@ unsafe fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize) // ===== impl SharedVtable ===== -static SHARED_VTABLE: Vtable = Vtable { - clone: shared_v_clone, - to_vec: shared_v_to_vec, - drop: shared_v_drop, -}; +struct SharedImpl { + shared: *mut Shared, + ptr: *const u8, + len: usize, +} -unsafe fn shared_v_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { - let shared = data.load(Ordering::Relaxed) as *mut Shared; - increment_shared(shared); +unsafe impl SharedBuf for SharedImpl { + fn into_parts(this: Self) -> (AtomicPtr<()>, *const u8, usize) { + (AtomicPtr::new(this.shared.cast()), this.ptr, this.len) + } - let data = AtomicPtr::new(shared as *mut ()); - Bytes::with_vtable(ptr, len, data, &SHARED_VTABLE) -} + unsafe fn from_parts(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Self { + SharedImpl { + shared: (data.with_mut(|p| *p)).cast(), + ptr, + len, + } + } + + unsafe fn clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BufferParts { + let shared = data.load(Ordering::Relaxed) as *mut Shared; + increment_shared(shared); -unsafe fn shared_v_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { - let shared: *mut Shared = data.load(Ordering::Relaxed).cast(); + (AtomicPtr::new(shared.cast()), ptr, len) + } - if (*shared).is_unique() { - let shared = &mut *shared; + unsafe fn into_vec(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { + let shared: *mut Shared = (data.with_mut(|p| *p)).cast(); - // Drop shared - let mut vec = mem::replace(&mut shared.vec, Vec::new()); - release_shared(shared); + if (*shared).is_unique() { + let shared = &mut *shared; - // Copy back buffer - ptr::copy(ptr, vec.as_mut_ptr(), len); - vec.set_len(len); + // Drop shared + let mut vec = mem::replace(&mut shared.vec, Vec::new()); + release_shared(shared); - vec - } else { - let v = slice::from_raw_parts(ptr, len).to_vec(); - release_shared(shared); - v + // Copy back buffer + ptr::copy(ptr, vec.as_mut_ptr(), len); + vec.set_len(len); + + vec + } else { + let v = slice::from_raw_parts(ptr, len).to_vec(); + release_shared(shared); + v + } } -} -unsafe fn shared_v_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) { - data.with_mut(|shared| { - release_shared(*shared as *mut Shared); - }); + unsafe fn drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) { + data.with_mut(|shared| { + release_shared(*shared as *mut Shared); + }); + } } // compile-fails diff --git a/src/impls/mod.rs b/src/impls/mod.rs new file mode 100644 index 000000000..7e10acd14 --- /dev/null +++ b/src/impls/mod.rs @@ -0,0 +1,3 @@ +pub mod promotable; +pub mod shared; +pub mod static_buf; diff --git a/src/impls/promotable.rs b/src/impls/promotable.rs new file mode 100644 index 000000000..79f220ce3 --- /dev/null +++ b/src/impls/promotable.rs @@ -0,0 +1,298 @@ +use crate::shared_buf::{BufferParts, SharedBuf}; +use alloc::{ + alloc::{dealloc, Layout}, + boxed::Box, + vec::Vec, +}; +use core::{mem, ptr, usize}; + +use super::shared::{self, SharedImpl}; +#[allow(unused)] +use crate::loom::sync::atomic::AtomicMut; +use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +const KIND_ARC: usize = 0b0; +const KIND_VEC: usize = 0b1; +const KIND_MASK: usize = 0b1; + +// ===== impl PromotableVtable ===== + +pub(crate) struct PromotableEvenImpl(pub Promotable); + +pub(crate) struct PromotableOddImpl(pub Promotable); + +pub(crate) enum Promotable { + Owned(Box<[u8]>), + Shared(SharedImpl), +} + +unsafe impl SharedBuf for PromotableEvenImpl { + fn into_parts(this: Self) -> (AtomicPtr<()>, *const u8, usize) { + let slice = match this.0 { + Promotable::Owned(slice) => slice, + Promotable::Shared(shared) => return SharedImpl::into_parts(shared), + }; + + let len = slice.len(); + let ptr = Box::into_raw(slice) as *mut u8; + assert!(ptr as usize & 0x1 == 0); + + let data = ptr_map(ptr, |addr| addr | KIND_VEC); + + (AtomicPtr::new(data.cast()), ptr, len) + } + + unsafe fn from_parts(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Self { + PromotableEvenImpl(promotable_from_bytes_parts(data, ptr, len, |shared| { + ptr_map(shared.cast(), |addr| addr & !KIND_MASK) + })) + } + + unsafe fn clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BufferParts { + let shared = data.load(Ordering::Acquire); + let kind = shared as usize & KIND_MASK; + + if kind == KIND_ARC { + shared::shallow_clone_arc(shared.cast(), ptr, len) + } else { + debug_assert_eq!(kind, KIND_VEC); + let buf = ptr_map(shared.cast(), |addr| addr & !KIND_MASK); + shallow_clone_vec(data, shared, buf, ptr, len) + } + } + + unsafe fn try_resize(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) { + // The Vec "promotable" vtables do not store the capacity, + // so we cannot truncate while using this repr. We *have* to + // promote using `clone` so the capacity can be stored. + drop(PromotableEvenImpl::clone(&*data, ptr, len)); + } + + unsafe fn into_vec(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { + promotable_into_vec(data, ptr, len, |shared| { + ptr_map(shared.cast(), |addr| addr & !KIND_MASK) + }) + } + + unsafe fn drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) { + data.with_mut(|shared| { + let shared = *shared; + let kind = shared as usize & KIND_MASK; + + if kind == KIND_ARC { + shared::release_shared(shared.cast()); + } else { + debug_assert_eq!(kind, KIND_VEC); + let buf = ptr_map(shared.cast(), |addr| addr & !KIND_MASK); + free_boxed_slice(buf, ptr, len); + } + }); + } +} + +unsafe fn promotable_from_bytes_parts( + data: &mut AtomicPtr<()>, + ptr: *const u8, + len: usize, + f: fn(*mut ()) -> *mut u8, +) -> Promotable { + let shared = data.with_mut(|p| *p); + let kind = shared as usize & KIND_MASK; + + if kind == KIND_ARC { + Promotable::Shared(SharedImpl::from_parts(data, ptr, len)) + } else { + debug_assert_eq!(kind, KIND_VEC); + + let buf = f(shared); + + let cap = (ptr as usize - buf as usize) + len; + + let vec = Vec::from_raw_parts(buf, cap, cap); + + Promotable::Owned(vec.into_boxed_slice()) + } +} + +unsafe fn promotable_into_vec( + data: &mut AtomicPtr<()>, + ptr: *const u8, + len: usize, + f: fn(*mut ()) -> *mut u8, +) -> Vec { + let shared = data.with_mut(|p| *p); + let kind = shared as usize & KIND_MASK; + + if kind == KIND_ARC { + shared::shared_into_vec_impl(shared.cast(), ptr, len) + } else { + // If Bytes holds a Vec, then the offset must be 0. + debug_assert_eq!(kind, KIND_VEC); + + let buf = f(shared); + + let cap = (ptr as usize - buf as usize) + len; + + // Copy back buffer + ptr::copy(ptr, buf, len); + + Vec::from_raw_parts(buf, len, cap) + } +} + +unsafe impl SharedBuf for PromotableOddImpl { + fn into_parts(this: Self) -> BufferParts { + let slice = match this.0 { + Promotable::Owned(slice) => slice, + Promotable::Shared(shared) => return SharedImpl::into_parts(shared), + }; + + let len = slice.len(); + let ptr = Box::into_raw(slice) as *mut u8; + assert!(ptr as usize & 0x1 == 1); + + (AtomicPtr::new(ptr.cast()), ptr, len) + } + + unsafe fn from_parts(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Self { + PromotableOddImpl(promotable_from_bytes_parts(data, ptr, len, |shared| { + shared.cast() + })) + } + + unsafe fn clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BufferParts { + let shared = data.load(Ordering::Acquire); + let kind = shared as usize & KIND_MASK; + + if kind == KIND_ARC { + shared::shallow_clone_arc(shared as _, ptr, len) + } else { + debug_assert_eq!(kind, KIND_VEC); + shallow_clone_vec(data, shared, shared.cast(), ptr, len) + } + } + + unsafe fn try_resize(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) { + // The Vec "promotable" vtables do not store the capacity, + // so we cannot truncate while using this repr. We *have* to + // promote using `clone` so the capacity can be stored. + drop(PromotableOddImpl::clone(&*data, ptr, len)); + } + + unsafe fn into_vec(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { + promotable_into_vec(data, ptr, len, |shared| shared.cast()) + } + + unsafe fn drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) { + data.with_mut(|shared| { + let shared = *shared; + let kind = shared as usize & KIND_MASK; + + if kind == KIND_ARC { + shared::release_shared(shared.cast()); + } else { + debug_assert_eq!(kind, KIND_VEC); + + free_boxed_slice(shared.cast(), ptr, len); + } + }); + } +} + +unsafe fn free_boxed_slice(buf: *mut u8, offset: *const u8, len: usize) { + let cap = (offset as usize - buf as usize) + len; + dealloc(buf, Layout::from_size_align(cap, 1).unwrap()) +} + +// Ideally we would always use this version of `ptr_map` since it is strict +// provenance compatible, but it results in worse codegen. We will however still +// use it on miri because it gives better diagnostics for people who test bytes +// code with miri. +// +// See https://github.com/tokio-rs/bytes/pull/545 for more info. +#[cfg(miri)] +fn ptr_map(ptr: *mut u8, f: F) -> *mut u8 +where + F: FnOnce(usize) -> usize, +{ + let old_addr = ptr as usize; + let new_addr = f(old_addr); + let diff = new_addr.wrapping_sub(old_addr); + ptr.wrapping_add(diff) +} + +#[cfg(not(miri))] +fn ptr_map(ptr: *mut u8, f: F) -> *mut u8 +where + F: FnOnce(usize) -> usize, +{ + let old_addr = ptr as usize; + let new_addr = f(old_addr); + new_addr as *mut u8 +} + +#[cold] +unsafe fn shallow_clone_vec( + atom: &AtomicPtr<()>, + ptr: *const (), + buf: *mut u8, + offset: *const u8, + len: usize, +) -> BufferParts { + // If the buffer is still tracked in a `Vec`. It is time to + // promote the vec to an `Arc`. This could potentially be called + // concurrently, so some care must be taken. + + // First, allocate a new `Shared` instance containing the + // `Vec` fields. It's important to note that `ptr`, `len`, + // and `cap` cannot be mutated without having `&mut self`. + // This means that these fields will not be concurrently + // updated and since the buffer hasn't been promoted to an + // `Arc`, those three fields still are the components of the + // vector. + let shared = Box::new(shared::Shared { + buf, + cap: (offset as usize - buf as usize) + len, + // Initialize refcount to 2. One for this reference, and one + // for the new clone that will be returned from + // `shallow_clone`. + ref_cnt: AtomicUsize::new(2), + }); + + let shared = Box::into_raw(shared); + + // The pointer should be aligned, so this assert should + // always succeed. + debug_assert!( + 0 == (shared as usize & KIND_MASK), + "internal: Box should have an aligned pointer", + ); + + // Try compare & swapping the pointer into the `arc` field. + // `Release` is used synchronize with other threads that + // will load the `arc` field. + // + // If the `compare_exchange` fails, then the thread lost the + // race to promote the buffer to shared. The `Acquire` + // ordering will synchronize with the `compare_exchange` + // that happened in the other thread and the `Shared` + // pointed to by `actual` will be visible. + match atom.compare_exchange(ptr as _, shared as _, Ordering::AcqRel, Ordering::Acquire) { + Ok(actual) => { + debug_assert!(actual as usize == ptr as usize); + // The upgrade was successful, the new handle can be + // returned. + (AtomicPtr::new(shared.cast()), offset, len) + } + Err(actual) => { + // The upgrade failed, a concurrent clone happened. Release + // the allocation that was made in this thread, it will not + // be needed. + let shared = Box::from_raw(shared); + mem::forget(*shared); + + // Buffer already promoted to shared storage, so increment ref + // count. + shared::shallow_clone_arc(actual as _, offset, len) + } + } +} diff --git a/src/impls/shared.rs b/src/impls/shared.rs new file mode 100644 index 000000000..4656743b0 --- /dev/null +++ b/src/impls/shared.rs @@ -0,0 +1,156 @@ +#[allow(unused)] +use crate::loom::sync::atomic::AtomicMut; +use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use crate::shared_buf::{BufferParts, SharedBuf}; +use alloc::{ + alloc::{dealloc, Layout}, + boxed::Box, + vec::Vec, +}; +use core::{mem, ptr, slice, usize}; + +// ===== impl SharedVtable ===== + +pub(crate) struct Shared { + // Holds arguments to dealloc upon Drop, but otherwise doesn't use them + pub(crate) buf: *mut u8, + pub(crate) cap: usize, + pub(crate) ref_cnt: AtomicUsize, +} + +impl Drop for Shared { + fn drop(&mut self) { + unsafe { dealloc(self.buf, Layout::from_size_align(self.cap, 1).unwrap()) } + } +} + +// Assert that the alignment of `Shared` is divisible by 2. +// This is a necessary invariant since we depend on allocating `Shared` a +// shared object to implicitly carry the `KIND_ARC` flag in its pointer. +// This flag is set when the LSB is 0. +const _: [(); 0 - mem::align_of::() % 2] = []; // Assert that the alignment of `Shared` is divisible by 2. + +pub(crate) struct SharedImpl { + shared: *mut Shared, + offset: *const u8, + len: usize, +} + +impl SharedImpl { + pub(crate) fn new(shared: *mut Shared, ptr: *const u8, len: usize) -> Self { + SharedImpl { + shared, + offset: ptr, + len, + } + } +} + +unsafe impl SharedBuf for SharedImpl { + fn into_parts(this: Self) -> (AtomicPtr<()>, *const u8, usize) { + (AtomicPtr::new(this.shared.cast()), this.offset, this.len) + } + + unsafe fn from_parts(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Self { + SharedImpl { + shared: (data.with_mut(|p| *p)).cast(), + offset: ptr, + len, + } + } + + unsafe fn clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BufferParts { + let shared = data.load(Ordering::Relaxed); + shallow_clone_arc(shared as _, ptr, len) + } + + unsafe fn into_vec(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { + shared_into_vec_impl((data.with_mut(|p| *p)).cast(), ptr, len) + } + + unsafe fn drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) { + data.with_mut(|shared| { + release_shared(shared.cast()); + }); + } +} + +pub(crate) unsafe fn shared_into_vec_impl( + shared: *mut Shared, + ptr: *const u8, + len: usize, +) -> Vec { + // Check that the ref_cnt is 1 (unique). + // + // If it is unique, then it is set to 0 with AcqRel fence for the same + // reason in release_shared. + // + // Otherwise, we take the other branch and call release_shared. + if (*shared) + .ref_cnt + .compare_exchange(1, 0, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + let buf = (*shared).buf; + let cap = (*shared).cap; + + // Deallocate Shared + drop(Box::from_raw(shared as *mut mem::ManuallyDrop)); + + // Copy back buffer + ptr::copy(ptr, buf, len); + + Vec::from_raw_parts(buf, len, cap) + } else { + let v = slice::from_raw_parts(ptr, len).to_vec(); + release_shared(shared); + v + } +} + +pub(crate) unsafe fn shallow_clone_arc( + shared: *mut Shared, + ptr: *const u8, + len: usize, +) -> BufferParts { + let old_size = (*shared).ref_cnt.fetch_add(1, Ordering::Relaxed); + + if old_size > usize::MAX >> 1 { + crate::abort(); + } + + let shared = AtomicPtr::new(shared.cast()); + (shared, ptr, len) +} + +pub(crate) unsafe fn release_shared(ptr: *mut Shared) { + // `Shared` storage... follow the drop steps from Arc. + if (*ptr).ref_cnt.fetch_sub(1, Ordering::Release) != 1 { + return; + } + + // This fence is needed to prevent reordering of use of the data and + // deletion of the data. Because it is marked `Release`, the decreasing + // of the reference count synchronizes with this `Acquire` fence. This + // means that use of the data happens before decreasing the reference + // count, which happens before this fence, which happens before the + // deletion of the data. + // + // As explained in the [Boost documentation][1], + // + // > It is important to enforce any possible access to the object in one + // > thread (through an existing reference) to *happen before* deleting + // > the object in a different thread. This is achieved by a "release" + // > operation after dropping a reference (any access to the object + // > through this reference must obviously happened before), and an + // > "acquire" operation before deleting the object. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) + // + // Thread sanitizer does not support atomic fences. Use an atomic load + // instead. + (*ptr).ref_cnt.load(Ordering::Acquire); + + // Drop the data + drop(Box::from_raw(ptr)); +} diff --git a/src/impls/static_buf.rs b/src/impls/static_buf.rs new file mode 100644 index 000000000..8558935ab --- /dev/null +++ b/src/impls/static_buf.rs @@ -0,0 +1,37 @@ +#[allow(unused)] +use crate::loom::sync::atomic::AtomicPtr; +use crate::shared_buf::{BufferParts, SharedBuf}; +use alloc::vec::Vec; +use core::{ptr, slice, usize}; +// ===== impl StaticVtable ===== + +pub(crate) struct StaticImpl(&'static [u8]); + +unsafe impl SharedBuf for StaticImpl { + fn into_parts(this: Self) -> BufferParts { + ( + AtomicPtr::new(ptr::null_mut()), + this.0.as_ptr(), + this.0.len(), + ) + } + + unsafe fn from_parts(_data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Self { + StaticImpl(slice::from_raw_parts(ptr, len)) + } + + unsafe fn clone(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BufferParts { + let slice = slice::from_raw_parts(ptr, len); + + (AtomicPtr::new(ptr::null_mut()), slice.as_ptr(), slice.len()) + } + + unsafe fn into_vec(_: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { + let slice = slice::from_raw_parts(ptr, len); + slice.to_vec() + } + + unsafe fn drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) { + // nothing to drop for &'static [u8] + } +} diff --git a/src/lib.rs b/src/lib.rs index af436b316..18a453404 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -87,9 +87,12 @@ pub use crate::buf::{Buf, BufMut}; mod bytes; mod bytes_mut; mod fmt; +mod impls; mod loom; +mod shared_buf; pub use crate::bytes::Bytes; pub use crate::bytes_mut::BytesMut; +pub use crate::shared_buf::{BufferParts, SharedBuf}; // Optional Serde support #[cfg(feature = "serde")] diff --git a/src/shared_buf.rs b/src/shared_buf.rs new file mode 100644 index 000000000..22e4d1086 --- /dev/null +++ b/src/shared_buf.rs @@ -0,0 +1,88 @@ +/// Refcounted Immutable Buffer +#[allow(unused)] +use crate::loom::sync::atomic::AtomicMut; +use crate::loom::sync::atomic::AtomicPtr; +use alloc::vec::Vec; + +/// A type alias for the tuple of: +/// 0. The data pointer referencing the container type used by the Bytes Instance +/// 1. The pointer offset into the buffer. +/// 2. The size of the buffer pointed to by [`ptr`] +pub type BufferParts = (AtomicPtr<()>, *const u8, usize); + +/// A trait that describes the inner shared buffer for [`Bytes`] types. +/// +/// The methods of the trait are all associated functions which are used as function +/// pointers in inner VTable implementation of the various modes of a [`Bytes`] instance. +/// +/// An implementor of this trait must be cheaply clonable, and feature a singular buffer +/// which can be safely sliced in any fashion between the bounds of it's pointer and its `len`. +/// +/// The remaining trait functions all take 3 parameters, which represent the state of the [`Bytes`] +/// instance that invoked the function. +/// The `data` param of each trait function equal the `AtomicPtr<()>` returned by into_parts. +/// The `ptr` param is the offset pointer into Self's buffer currently utilized in the calling [`Bytes`] instance. +/// The `len` param is the length of the slice from `ptr` currently utilized in the calling [`Bytes`] instance. +/// +/// For implementors that leverage refcounting, typically some sort of Wrapper struct +/// will need to act as a proxy between the [`Bytes`] instance and the inner type which does the +/// reference counting and manages its Buffer. This is similar to the implementation of [`Arc`]. +/// +/// # Example +/// +/// [Here is an example implementation](https://github.com/tokio-rs/bytes/blob/master/tests/extern_buf_bytes.rs#L58) +/// +/// # Safety +/// +/// This trait deals exclusively with raw pointers. These functions will cause UB if: +/// * The data pointer is NULL and the implemented functions expect a valid pointer. +/// * [`ptr`] is NULL or outside of the bounds of an allocated buffer. +/// * The len exceeds the capacity of the buffer pointed to by [`ptr`] and/or [`data`] +/// * The drop function deallocates the buffer in a different manner than it was allocated. +/// +/// * [`Arc`]: std::sync::Arc +pub unsafe trait SharedBuf: 'static { + /// Decompose `Self` into parts used by `Bytes`. + fn into_parts(this: Self) -> BufferParts; + + /// Creates itself directly from the raw bytes parts decomposed with `into_bytes_parts` + /// + /// # Safety + /// + /// The implementation of this function must ensure that data and ptr and len are valid + unsafe fn from_parts(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Self; + + /// (possibly) increases the reference count then + /// returns the parts necessary to construct a new Bytes instance. + /// + /// # Safety + /// + /// The implementation of this function must ensure that data and ptr and len are valid + unsafe fn clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BufferParts; + + /// Called before the `Bytes::truncate` is processed. + /// Useful if the implementation needs some preparation step for it. + /// + /// # Safety + /// + /// The implementation of this function must ensure that data and ptr and len are valid + unsafe fn try_resize(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) { + let (_, _, _) = (data, ptr, len); + } + + /// Consumes underlying resources and return `Vec`, usually with allocation + /// + /// # Safety + /// + /// The implementation of this function must ensure that data and ptr and len are valid + unsafe fn into_vec(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec; + + /// Release underlying resources. + /// Decrement a refcount. + /// If refcount == 0 then drop or otherwise deallocate any resources allocated by T + /// + /// # Safety + /// + /// The implementation of this function must ensure that data and ptr and len are valid + unsafe fn drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize); +} diff --git a/tests/extern_buf_bytes.rs b/tests/extern_buf_bytes.rs new file mode 100644 index 000000000..311040103 --- /dev/null +++ b/tests/extern_buf_bytes.rs @@ -0,0 +1,362 @@ +#![warn(rust_2018_idioms)] + +use bytes::{BufferParts, Bytes, BytesMut, SharedBuf}; + +use std::alloc::{alloc, dealloc, Layout}; +use std::ptr::{self, NonNull}; +use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use std::usize; + +struct ExternBuf { + ptr: NonNull, + cap: usize, + ref_count: AtomicUsize, +} + +impl ExternBuf { + // We're pretending that this is some sort of exotic allocation/recycling scheme + pub fn from_size(sz: usize) -> Self { + let layout = Layout::array::(sz).unwrap(); + let ptr = NonNull::new(unsafe { alloc(layout) }).unwrap(); + ExternBuf { + ptr, + cap: sz, + ref_count: AtomicUsize::new(1), + } + } + + pub fn into_shared(self) -> ExternBufWrapper { + let b = Box::new(self); + let inner = Box::into_raw(b); + ExternBufWrapper { inner } + } +} + +impl From<&[u8]> for ExternBuf { + fn from(buf: &[u8]) -> Self { + let sz = buf.len(); + let newbuf = ExternBuf::from_size(sz); + unsafe { ptr::copy_nonoverlapping(buf.as_ptr(), newbuf.ptr.as_ptr(), sz) }; + newbuf + } +} + +impl Drop for ExternBuf { + fn drop(&mut self) { + let layout = Layout::array::(self.cap).unwrap(); + unsafe { + dealloc(self.ptr.as_ptr(), layout); + } + } +} + +struct ExternBufWrapper { + inner: *mut ExternBuf, +} + +unsafe impl SharedBuf for ExternBufWrapper { + fn into_parts(this: Self) -> BufferParts { + unsafe { + ( + AtomicPtr::new(this.inner.cast()), + (*this.inner).ptr.as_ptr(), + (*this.inner).cap, + ) + } + } + + unsafe fn from_parts(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) -> Self { + let inner = data.load(Ordering::Acquire).cast(); + ExternBufWrapper { inner } + } + + unsafe fn clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BufferParts { + let inner: *mut ExternBuf = data.load(Ordering::Acquire).cast(); + let old_size = (*inner).ref_count.fetch_add(1, Ordering::Release); + if old_size > usize::MAX >> 1 { + panic!("wat"); + } + (AtomicPtr::new(inner.cast()), ptr, len) + } + + unsafe fn into_vec(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { + let inner: *mut ExternBuf = (*data.get_mut()).cast(); + if (*inner) + .ref_count + .compare_exchange(1, 0, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + let buf = (*inner).ptr; + let cap = (*inner).cap; + + drop(Box::from_raw( + inner as *mut std::mem::ManuallyDrop, + )); + + // Copy back buffer + ptr::copy(ptr, buf.as_ptr(), len); + + Vec::from_raw_parts(buf.as_ptr(), len, cap) + } else { + let v = std::slice::from_raw_parts(ptr, len).to_vec(); + Self::drop(data, ptr, len); + v + } + } + + unsafe fn drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) { + let inner: *mut ExternBuf = (data.get_mut()).cast(); + if (*inner).ref_count.fetch_sub(1, Ordering::Release) != 1 { + return; + } + (*inner).ref_count.load(Ordering::Acquire); + drop(Box::from_raw(inner)); + } +} + +fn is_sync() {} +fn is_send() {} + +#[test] +fn test_bounds() { + is_sync::(); + is_sync::(); + is_send::(); + is_send::(); +} + +#[test] +fn test_layout() { + use std::mem; + + assert_eq!( + mem::size_of::(), + mem::size_of::() * 4, + "Bytes size should be 4 words", + ); + assert_eq!( + mem::size_of::(), + mem::size_of::() * 4, + "BytesMut should be 4 words", + ); + + assert_eq!( + mem::size_of::(), + mem::size_of::>(), + "Bytes should be same size as Option", + ); + + assert_eq!( + mem::size_of::(), + mem::size_of::>(), + "BytesMut should be same size as Option", + ); +} + +#[test] +fn roundtrip() { + let eb = ExternBuf::from(&b"abcdefgh"[..]); + let a = Bytes::from_shared_buf(eb.into_shared()); + let ebw = a.into_shared_buf::().unwrap(); + let a = Bytes::from_shared_buf(ebw); + let ebw2 = a.into_shared_buf::().unwrap(); + let a2 = Bytes::from_shared_buf(ebw2); + assert_eq!(a2, b"abcdefgh"[..]); +} + +#[test] +fn to_vec() { + let eb = ExternBuf::from(&b"abcdefgh"[..]); + let a = Bytes::from_shared_buf(eb.into_shared()); + let v = Vec::from(a); + assert_eq!(v, b"abcdefgh"[..]); +} + +#[test] +fn refer_madness() { + let eb = ExternBuf::from(&b"abcdefgh"[..]); + let a = Bytes::from_shared_buf(eb.into_shared()); + let b = a.slice(..); + let c = b.slice(..); + let d = c.slice(..5); + let e = d.slice(1..3); + drop(d); + assert_eq!(e, b"bc"[..]); +} + +#[test] +fn from_slice() { + let eb1 = ExternBuf::from(&b"abcdefgh"[..]); + let a1 = Bytes::from_shared_buf(eb1.into_shared()); + assert_eq!(a1, b"abcdefgh"[..]); + assert_eq!(a1, &b"abcdefgh"[..]); + assert_eq!(a1, Vec::from(&b"abcdefgh"[..])); + assert_eq!(b"abcdefgh"[..], a1); + assert_eq!(&b"abcdefgh"[..], a1); + assert_eq!(Vec::from(&b"abcdefgh"[..]), a1); + + let eb2 = ExternBuf::from(&b"abcdefgh"[..]); + let a2 = Bytes::from_shared_buf(eb2.into_shared()); + assert_eq!(a2, b"abcdefgh"[..]); + assert_eq!(a2, &b"abcdefgh"[..]); + assert_eq!(a2, Vec::from(&b"abcdefgh"[..])); + assert_eq!(b"abcdefgh"[..], a2); + assert_eq!(&b"abcdefgh"[..], a2); + assert_eq!(Vec::from(&b"abcdefgh"[..]), a2); +} + +#[test] +fn len() { + let eb = ExternBuf::from(&b"abcdefg"[..]); + let a = Bytes::from_shared_buf(eb.into_shared()); + assert_eq!(a.len(), 7); + + let eb = ExternBuf::from(&b""[..]); + let a = Bytes::from_shared_buf(eb.into_shared()); + assert!(a.is_empty()); +} + +#[test] +fn index() { + let eb = ExternBuf::from(&b"hello world"[..]); + let a = Bytes::from_shared_buf(eb.into_shared()); + assert_eq!(a[0..5], *b"hello"); +} + +#[test] +fn slice() { + let eb = ExternBuf::from(&b"hello world"[..]); + let a = Bytes::from_shared_buf(eb.into_shared()); + + let b = a.slice(3..5); + assert_eq!(b, b"lo"[..]); + + let b = a.slice(0..0); + assert_eq!(b, b""[..]); + + let b = a.slice(3..3); + assert_eq!(b, b""[..]); + + let b = a.slice(a.len()..a.len()); + assert_eq!(b, b""[..]); + + let b = a.slice(..5); + assert_eq!(b, b"hello"[..]); + + let b = a.slice(3..); + assert_eq!(b, b"lo world"[..]); +} + +#[test] +#[should_panic] +fn slice_oob_1() { + let eb = ExternBuf::from(&b"hello world"[..]); + let a = Bytes::from_shared_buf(eb.into_shared()); + a.slice(5..44); +} + +#[test] +#[should_panic] +fn slice_oob_2() { + let eb = ExternBuf::from(&b"hello world"[..]); + let a = Bytes::from_shared_buf(eb.into_shared()); + a.slice(44..49); +} + +#[test] +fn split_off() { + let eb = ExternBuf::from(&b"helloworld"[..]); + let mut hello = Bytes::from_shared_buf(eb.into_shared()); + let world = hello.split_off(5); + + assert_eq!(hello, &b"hello"[..]); + assert_eq!(world, &b"world"[..]); +} + +#[test] +#[should_panic] +fn split_off_oob() { + let eb = ExternBuf::from(&b"helloworld"[..]); + let mut hello = Bytes::from_shared_buf(eb.into_shared()); + let _ = hello.split_off(44); +} + +#[test] +fn split_off_to_loop() { + let s = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + for i in 0..(s.len() + 1) { + { + let eb = ExternBuf::from(&s[..]); + let mut bytes = Bytes::from_shared_buf(eb.into_shared()); + let off = bytes.split_off(i); + assert_eq!(i, bytes.len()); + let mut sum = Vec::new(); + sum.extend(bytes.iter()); + sum.extend(off.iter()); + assert_eq!(&s[..], &sum[..]); + } + { + let eb = ExternBuf::from(&s[..]); + let mut bytes = Bytes::from_shared_buf(eb.into_shared()); + let off = bytes.split_to(i); + assert_eq!(i, off.len()); + let mut sum = Vec::new(); + sum.extend(off.iter()); + sum.extend(bytes.iter()); + assert_eq!(&s[..], &sum[..]); + } + } +} + +#[test] +fn truncate() { + let s = &b"helloworld"[..]; + let eb = ExternBuf::from(&s[..]); + let mut hello = Bytes::from_shared_buf(eb.into_shared()); + hello.truncate(15); + assert_eq!(hello, s); + hello.truncate(10); + assert_eq!(hello, s); + hello.truncate(5); + assert_eq!(hello, "hello"); +} + +#[test] +// Only run these tests on little endian systems. CI uses qemu for testing +// big endian... and qemu doesn't really support threading all that well. +#[cfg(any(miri, target_endian = "little"))] +fn stress() { + // Tests promoting a buffer from a vec -> shared in a concurrent situation + use std::sync::{Arc, Barrier}; + use std::thread; + + const THREADS: usize = 8; + const ITERS: usize = if cfg!(miri) { 100 } else { 1_000 }; + + for i in 0..ITERS { + let data = [i as u8; 256]; + let eb = ExternBuf::from(&data[..]); + let buf = Arc::new(Bytes::from_shared_buf(eb.into_shared())); + + let barrier = Arc::new(Barrier::new(THREADS)); + let mut joins = Vec::with_capacity(THREADS); + + for _ in 0..THREADS { + let c = barrier.clone(); + let buf = buf.clone(); + + joins.push(thread::spawn(move || { + c.wait(); + let buf: Bytes = (*buf).clone(); + drop(buf); + })); + } + + for th in joins { + th.join().unwrap(); + } + + assert_eq!(*buf, data[..]); + } +}