elemaudio_rs/
audio_ring_buffer.rs1use crate::{Error, Result};
2use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4
5#[derive(Debug, Clone)]
7pub struct AudioRingBuffer {
8 inner: Arc<Inner>,
9}
10
11#[derive(Debug)]
12struct Inner {
13 channels: usize,
14 capacity_frames: usize,
15 sample_rate: AtomicU64,
16 write_pos: AtomicUsize,
17 read_pos: AtomicUsize,
18 samples: Box<[AtomicU32]>,
19}
20
21impl AudioRingBuffer {
22 pub fn new(channels: usize, capacity_frames: usize, sample_rate: f64) -> Result<Self> {
24 if channels == 0 {
25 return Err(Error::InvalidArgument("channels must be greater than zero"));
26 }
27
28 if capacity_frames == 0 {
29 return Err(Error::InvalidArgument(
30 "capacity_frames must be greater than zero",
31 ));
32 }
33
34 if !sample_rate.is_finite() || sample_rate <= 0.0 {
35 return Err(Error::InvalidArgument(
36 "sample_rate must be a positive finite number",
37 ));
38 }
39
40 let sample_count = channels
41 .checked_mul(capacity_frames)
42 .ok_or(Error::InvalidArgument("ring buffer capacity is too large"))?;
43
44 let samples = (0..sample_count)
45 .map(|_| AtomicU32::new(0))
46 .collect::<Vec<_>>()
47 .into_boxed_slice();
48
49 Ok(Self {
50 inner: Arc::new(Inner {
51 channels,
52 capacity_frames,
53 sample_rate: AtomicU64::new(sample_rate.to_bits()),
54 write_pos: AtomicUsize::new(0),
55 read_pos: AtomicUsize::new(0),
56 samples,
57 }),
58 })
59 }
60
61 pub fn channels(&self) -> usize {
63 self.inner.channels
64 }
65
66 pub fn capacity_frames(&self) -> usize {
68 self.inner.capacity_frames
69 }
70
71 pub fn sample_rate(&self) -> f64 {
73 f64::from_bits(self.inner.sample_rate.load(Ordering::Relaxed))
74 }
75
76 pub fn reset_sample_rate(&self, sample_rate: f64) -> Result<()> {
78 if !sample_rate.is_finite() || sample_rate <= 0.0 {
79 return Err(Error::InvalidArgument(
80 "sample_rate must be a positive finite number",
81 ));
82 }
83
84 self.clear();
85 self.inner
86 .sample_rate
87 .store(sample_rate.to_bits(), Ordering::Relaxed);
88 Ok(())
89 }
90
91 pub fn available_frames(&self) -> usize {
93 let write = self.inner.write_pos.load(Ordering::Acquire);
94 let read = self.inner.read_pos.load(Ordering::Acquire);
95 write.saturating_sub(read).min(self.inner.capacity_frames)
96 }
97
98 pub fn free_frames(&self) -> usize {
100 self.inner
101 .capacity_frames
102 .saturating_sub(self.available_frames())
103 }
104
105 pub fn clear(&self) {
107 self.inner.write_pos.store(0, Ordering::Release);
108 self.inner.read_pos.store(0, Ordering::Release);
109 for sample in self.inner.samples.iter() {
110 sample.store(0, Ordering::Relaxed);
111 }
112 }
113
114 pub fn push_planar_f64(&self, inputs: &[&[f64]], frames: usize) -> usize {
116 if inputs.len() != self.channels() || frames == 0 {
117 return 0;
118 }
119
120 if inputs.iter().any(|channel| channel.len() < frames) {
121 return 0;
122 }
123
124 let frames = frames.min(self.free_frames());
125 if frames == 0 {
126 return 0;
127 }
128
129 let channels = self.channels();
130 let mut write = self.inner.write_pos.load(Ordering::Relaxed);
131
132 for frame in 0..frames {
133 let slot = write % self.inner.capacity_frames;
134 let base = slot * channels;
135
136 for (channel, channel_input) in inputs.iter().enumerate().take(channels) {
137 self.inner.samples[base + channel]
138 .store((channel_input[frame] as f32).to_bits(), Ordering::Relaxed);
139 }
140
141 write = write.wrapping_add(1);
142 }
143
144 self.inner.write_pos.store(write, Ordering::Release);
145 frames
146 }
147
148 pub fn pop_to_hardware<T>(
150 &self,
151 output: &mut [T],
152 hardware_channels: usize,
153 convert: impl Fn(f32) -> T,
154 ) -> usize
155 where
156 T: Copy + Default,
157 {
158 if output.is_empty() || hardware_channels == 0 {
159 return 0;
160 }
161
162 let frames = output.len() / hardware_channels;
163 let frames = frames.min(self.available_frames());
164 let active_channels = self.channels().min(hardware_channels);
165 let channels = self.channels();
166 let mut read = self.inner.read_pos.load(Ordering::Relaxed);
167
168 for frame in 0..frames {
169 let slot = read % self.inner.capacity_frames;
170 let base = slot * channels;
171 let out_base = frame * hardware_channels;
172
173 for channel in 0..active_channels {
174 output[out_base + channel] = convert(f32::from_bits(
175 self.inner.samples[base + channel].load(Ordering::Relaxed),
176 ));
177 }
178
179 for channel in active_channels..hardware_channels {
180 output[out_base + channel] = T::default();
181 }
182
183 read = read.wrapping_add(1);
184 }
185
186 for sample in &mut output[frames * hardware_channels..] {
187 *sample = T::default();
188 }
189
190 self.inner.read_pos.store(read, Ordering::Release);
191 frames
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::AudioRingBuffer;
198
199 #[test]
200 fn push_and_pop() {
201 let ring = AudioRingBuffer::new(2, 8, 48_000.0).unwrap();
202 let written = ring.push_planar_f64(&[&[1.0, 3.0], &[2.0, 4.0]], 2);
203 assert_eq!(written, 2);
204
205 let mut out = [0.0_f32; 4];
206 let read = ring.pop_to_hardware(&mut out, 2, |x| x);
207 assert_eq!(read, 2);
208 assert_eq!(out, [1.0, 2.0, 3.0, 4.0]);
209 }
210
211 #[test]
212 fn underrun_zero_fills() {
213 let ring = AudioRingBuffer::new(2, 8, 48_000.0).unwrap();
214 let mut out = [9.0_f32; 4];
215 let read = ring.pop_to_hardware(&mut out, 2, |x| x);
216 assert_eq!(read, 0);
217 assert_eq!(out, [0.0, 0.0, 0.0, 0.0]);
218 }
219}