Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 42 additions & 39 deletions encodings/fastlanes/src/bitpacking/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,26 @@ fn filter_primitive_without_patches<U: UnsignedPType + BitPacking>(
array: &BitPackedArray,
selection: &Arc<MaskValues>,
) -> VortexResult<(Buffer<U>, Validity)> {
let values = filter_with_indices(array, selection.indices());
let selection_buffer = selection.bit_buffer();

let values = filter_with_indices(
array,
selection_buffer.set_indices(),
selection_buffer.true_count(),
);
let validity = array.validity()?.filter(&Mask::Values(selection.clone()))?;

Ok((values.freeze(), validity))
}

fn filter_with_indices<T: NativePType + BitPacking>(
fn filter_with_indices<T: NativePType + BitPacking, I: Iterator<Item = usize>>(
array: &BitPackedArray,
indices: &[usize],
indices: I,
indices_len: usize,
) -> BufferMut<T> {
let offset = array.offset() as usize;
let bit_width = array.bit_width() as usize;
let mut values = BufferMut::with_capacity(indices.len());
let mut values = BufferMut::with_capacity(indices_len);

// Some re-usable memory to store per-chunk indices.
let mut unpacked = [const { MaybeUninit::<T>::uninit() }; 1024];
Expand All @@ -118,43 +125,39 @@ fn filter_with_indices<T: NativePType + BitPacking>(
// Group the indices by the FastLanes chunk they belong to.
let chunk_size = 128 * bit_width / size_of::<T>();

chunked_indices(
indices.iter().copied(),
offset,
|chunk_idx, indices_within_chunk| {
let packed = &packed_bytes[chunk_idx * chunk_size..][..chunk_size];

if indices_within_chunk.len() == 1024 {
// Unpack the entire chunk.
unsafe {
let values_len = values.len();
values.set_len(values_len + 1024);
BitPacking::unchecked_unpack(
bit_width,
packed,
&mut values.as_mut_slice()[values_len..],
);
}
} else if indices_within_chunk.len() > UNPACK_CHUNK_THRESHOLD {
// Unpack into a temporary chunk and then copy the values.
unsafe {
let dst: &mut [MaybeUninit<T>] = &mut unpacked;
let dst: &mut [T] = std::mem::transmute(dst);
BitPacking::unchecked_unpack(bit_width, packed, dst);
}
values.extend_trusted(
indices_within_chunk
.iter()
.map(|&idx| unsafe { unpacked.get_unchecked(idx).assume_init() }),
chunked_indices(indices, offset, |chunk_idx, indices_within_chunk| {
let packed = &packed_bytes[chunk_idx * chunk_size..][..chunk_size];

if indices_within_chunk.len() == 1024 {
// Unpack the entire chunk.
unsafe {
let values_len = values.len();
values.set_len(values_len + 1024);
BitPacking::unchecked_unpack(
bit_width,
packed,
&mut values.as_mut_slice()[values_len..],
);
} else {
// Otherwise, unpack each element individually.
values.extend_trusted(indices_within_chunk.iter().map(|&idx| unsafe {
BitPacking::unchecked_unpack_single(bit_width, packed, idx)
}));
}
},
);
} else if indices_within_chunk.len() > UNPACK_CHUNK_THRESHOLD {
// Unpack into a temporary chunk and then copy the values.
unsafe {
let dst: &mut [MaybeUninit<T>] = &mut unpacked;
let dst: &mut [T] = std::mem::transmute(dst);
BitPacking::unchecked_unpack(bit_width, packed, dst);
}
values.extend_trusted(
indices_within_chunk
.iter()
.map(|&idx| unsafe { unpacked.get_unchecked(idx).assume_init() }),
);
} else {
// Otherwise, unpack each element individually.
values.extend_trusted(indices_within_chunk.iter().map(|&idx| unsafe {
BitPacking::unchecked_unpack_single(bit_width, packed, idx)
}));
}
});

values
}
Expand Down
2 changes: 1 addition & 1 deletion encodings/runend/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl FilterKernel for RunEndVTable {
if runs_ratio < FILTER_TAKE_THRESHOLD || mask_values.true_count() < 25 {
Ok(Some(take_indices_unchecked(
array,
mask_values.indices(),
mask_values.bit_buffer().set_indices(),
&Validity::NonNullable,
)?))
} else {
Expand Down
12 changes: 8 additions & 4 deletions encodings/runend/src/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,19 @@ impl TakeExecute for RunEndVTable {
.collect::<VortexResult<Vec<_>>>()?
});

take_indices_unchecked(array, &checked_indices, primitive_indices.validity()).map(Some)
take_indices_unchecked(
array,
checked_indices.into_iter(),
primitive_indices.validity(),
)
.map(Some)
}
}

/// Perform a take operation on a RunEndArray by binary searching for each of the indices.
pub fn take_indices_unchecked<T: AsPrimitive<usize>>(
pub fn take_indices_unchecked<T: AsPrimitive<usize>, I: Iterator<Item = T>>(
array: &RunEndArray,
indices: &[T],
indices: I,
validity: &Validity,
) -> VortexResult<ArrayRef> {
let ends = array.ends().to_primitive();
Expand All @@ -66,7 +71,6 @@ pub fn take_indices_unchecked<T: AsPrimitive<usize>>(
let physical_indices = match_each_integer_ptype!(ends.ptype(), |I| {
let end_slices = ends.as_slice::<I>();
let physical_indices_vec: Vec<u64> = indices
.iter()
.map(|idx| idx.as_() + array.offset())
.map(|idx| {
match <I as NumCast>::from(idx) {
Expand Down
2 changes: 1 addition & 1 deletion encodings/sequence/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn filter_impl<T: NativePType>(mul: T, base: T, mask: &Mask, validity: Validity)
.values()
.vortex_expect("FilterKernel precondition: mask is Mask::Values");
let mut buffer = BufferMut::<T>::with_capacity(mask_values.true_count());
buffer.extend(mask_values.indices().iter().map(|&idx| {
buffer.extend(mask_values.bit_buffer().set_indices().map(|idx| {
let i = T::from_usize(idx).vortex_expect("all valid indices fit");
base + i * mul
}));
Expand Down
34 changes: 15 additions & 19 deletions encodings/sparse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use vortex_error::VortexExpect as _;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_mask::AllOr;
use vortex_mask::Mask;
use vortex_scalar::Scalar;
use vortex_scalar::ScalarValue;
Expand Down Expand Up @@ -310,23 +309,16 @@ impl SparseArray {
} else if mask.false_count() as f64 > (0.9 * mask.len() as f64) {
// Array is dominated by NULL but has non-NULL values
let non_null_values = filter(array, &mask)?;
let non_null_indices = match mask.indices() {
AllOr::All => {
// We already know that the mask is 90%+ false
unreachable!("Mask is mostly null")
}
AllOr::None => {
// we know there are some non-NULL values
unreachable!("Mask is mostly null but not all null")
}
AllOr::Some(values) => {
let buffer: Buffer<u32> = values
.iter()
.map(|&v| v.try_into().vortex_expect("indices must fit in u32"))
.collect();

buffer.into_array()
}
let non_null_indices = if let Some(mask_values) = mask.values() {
let buffer: Buffer<u32> = mask_values
.bit_buffer()
.set_indices()
.map(|v| v.try_into().vortex_expect("indices must fit in u32"))
.collect();

buffer.into_array()
} else {
unreachable!()
};

return Ok(SparseArray::try_new(
Expand Down Expand Up @@ -369,7 +361,11 @@ impl SparseArray {
// All values are equal to the top value
return Ok(fill_array);
}
Mask::Values(values) => values.indices().iter().map(|v| *v as u64).collect(),
Mask::Values(values) => values
.bit_buffer()
.set_indices()
.map(|v| v as u64)
.collect(),
};

SparseArray::try_new(indices.into_array(), non_top_values, array.len(), fill)
Expand Down
139 changes: 69 additions & 70 deletions encodings/zstd/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use vortex_mask::AllOr;
use vortex_scalar::Scalar;
use vortex_session::VortexSession;

Expand Down Expand Up @@ -251,27 +250,25 @@ fn collect_valid_primitive(parray: &PrimitiveArray) -> VortexResult<PrimitiveArr

fn collect_valid_vbv(vbv: &VarBinViewArray) -> VortexResult<(ByteBuffer, Vec<usize>)> {
let mask = vbv.validity_mask()?;
let buffer_and_value_byte_indices = match mask.bit_buffer() {
AllOr::None => (Buffer::empty(), Vec::new()),
_ => {
let mut buffer = BufferMut::with_capacity(
usize::try_from(vbv.nbytes()).vortex_expect("must fit into buffer")
+ mask.true_count() * size_of::<ViewLen>(),
);
let mut value_byte_indices = Vec::new();
vbv.with_iterator(|iterator| {
// by flattening, we should omit nulls
for value in iterator.flatten() {
value_byte_indices.push(buffer.len());
// here's where we write the string lengths
buffer
.extend_trusted(ViewLen::try_from(value.len())?.to_le_bytes().into_iter());
buffer.extend_from_slice(value);
}
Ok::<_, VortexError>(())
})?;
(buffer.freeze(), value_byte_indices)
}
let buffer_and_value_byte_indices = if mask.all_false() {
(Buffer::empty(), Vec::new())
} else {
let mut buffer = BufferMut::with_capacity(
usize::try_from(vbv.nbytes()).vortex_expect("must fit into buffer")
+ mask.true_count() * size_of::<ViewLen>(),
);
let mut value_byte_indices = Vec::new();
vbv.with_iterator(|iterator| {
// by flattening, we should omit nulls
for value in iterator.flatten() {
value_byte_indices.push(buffer.len());
// here's where we write the string lengths
buffer.extend_trusted(ViewLen::try_from(value.len())?.to_le_bytes().into_iter());
buffer.extend_from_slice(value);
}
Ok::<_, VortexError>(())
})?;
(buffer.freeze(), value_byte_indices)
};
Ok(buffer_and_value_byte_indices)
}
Expand Down Expand Up @@ -719,57 +716,59 @@ impl ZstdArray {
Ok(primitive.into_array())
}
DType::Binary(_) | DType::Utf8(_) => {
match slice_validity.to_mask(slice_n_rows).indices() {
AllOr::All => {
// the decompressed buffer is a bunch of interleaved u32 lengths
// and strings of those lengths, we need to reconstruct the
// views into those strings by passing through the buffer.
let valid_views = reconstruct_views(&decompressed).slice(
slice_value_idx_start - n_skipped_values
..slice_value_idx_stop - n_skipped_values,
);

// SAFETY: we properly construct the views inside `reconstruct_views`
Ok(unsafe {
VarBinViewArray::new_unchecked(
valid_views,
Arc::from([decompressed]),
self.dtype.clone(),
slice_validity,
)
}
.into_array())
let mask = slice_validity.to_mask(slice_n_rows);
if mask.all_true() {
// the decompressed buffer is a bunch of interleaved u32 lengths
// and strings of those lengths, we need to reconstruct the
// views into those strings by passing through the buffer.
let valid_views = reconstruct_views(&decompressed).slice(
slice_value_idx_start - n_skipped_values
..slice_value_idx_stop - n_skipped_values,
);

// SAFETY: we properly construct the views inside `reconstruct_views`
Ok(unsafe {
VarBinViewArray::new_unchecked(
valid_views,
Arc::from([decompressed]),
self.dtype.clone(),
slice_validity,
)
}
AllOr::None => Ok(ConstantArray::new(
Scalar::null(self.dtype.clone()),
slice_n_rows,
.into_array())
} else if mask.all_false() {
Ok(
ConstantArray::new(Scalar::null(self.dtype.clone()), slice_n_rows)
.into_array(),
)
.into_array()),
AllOr::Some(valid_indices) => {
// the decompressed buffer is a bunch of interleaved u32 lengths
// and strings of those lengths, we need to reconstruct the
// views into those strings by passing through the buffer.
let valid_views = reconstruct_views(&decompressed).slice(
slice_value_idx_start - n_skipped_values
..slice_value_idx_stop - n_skipped_values,
);

let mut views = BufferMut::<BinaryView>::zeroed(slice_n_rows);
for (view, index) in valid_views.into_iter().zip_eq(valid_indices) {
views[*index] = view
}

// SAFETY: we properly construct the views inside `reconstruct_views`
Ok(unsafe {
VarBinViewArray::new_unchecked(
views.freeze(),
Arc::from([decompressed]),
self.dtype.clone(),
slice_validity,
)
}
.into_array())
} else {
let mask_values = mask.values().unwrap();
// the decompressed buffer is a bunch of interleaved u32 lengths
// and strings of those lengths, we need to reconstruct the
// views into those strings by passing through the buffer.
let valid_views = reconstruct_views(&decompressed).slice(
slice_value_idx_start - n_skipped_values
..slice_value_idx_stop - n_skipped_values,
);

let mut views = BufferMut::<BinaryView>::zeroed(slice_n_rows);
for (view, index) in valid_views
.into_iter()
.zip_eq(mask_values.bit_buffer().set_indices())
{
views[index] = view
}

// SAFETY: we properly construct the views inside `reconstruct_views`
Ok(unsafe {
VarBinViewArray::new_unchecked(
views.freeze(),
Arc::from([decompressed]),
self.dtype.clone(),
slice_validity,
)
}
.into_array())
}
}
_ => vortex_panic!("Unsupported dtype for Zstd array: {}", self.dtype),
Expand Down
Loading
Loading