rustriff_lib/services/
amp_config_service.rs1use crate::domain::dto::amp_config_dto::AmpConfigDto;
2use crate::infrastructure::persistence::amp_config_persistence_trait::AmpConfigPersistence;
3use crate::services::audio_service::AudioService;
4use std::sync::{Arc, Condvar, Mutex};
5use std::thread;
6use tracing::error;
7
8pub struct AmpConfigPersistenceService {
14 repository: Arc<dyn AmpConfigPersistence>,
15 pending_snapshot: Arc<(Mutex<Option<AmpConfigDto>>, Condvar)>,
16}
17
18impl AmpConfigPersistenceService {
19 pub fn new(repository: Box<dyn AmpConfigPersistence>) -> Self {
21 let repository: Arc<dyn AmpConfigPersistence> = Arc::from(repository);
22 let pending_snapshot = Arc::new((Mutex::new(None), Condvar::new()));
23 let worker_pending_snapshot = Arc::clone(&pending_snapshot);
24 let worker_repository = Arc::clone(&repository);
25
26 thread::spawn(move || loop {
29 let latest_snapshot = {
30 let (lock, cv) = &*worker_pending_snapshot;
31 let mut pending = lock
32 .lock()
33 .expect("pending snapshot lock should be available");
34 while pending.is_none() {
35 pending = cv
36 .wait(pending)
37 .expect("pending snapshot lock should be available after wait");
38 }
39 pending
40 .take()
41 .expect("snapshot should be available when worker wakes")
42 };
43
44 if let Err(err) = worker_repository.save(&latest_snapshot) {
45 error!("Failed to persist amp config snapshot in background worker: {err}");
46 }
47 });
48
49 Self {
50 repository,
51 pending_snapshot,
52 }
53 }
54
55 pub fn load_amp_config(&self) -> Result<Option<AmpConfigDto>, String> {
57 self.repository.load()
58 }
59
60 pub fn persist_from_audio_service(&self, audio_service: &AudioService) -> Result<(), String> {
66 let snapshot = AmpConfigDto::from_service(audio_service);
67 self.persist_snapshot(snapshot)
68 }
69
70 pub fn persist_snapshot(&self, snapshot: AmpConfigDto) -> Result<(), String> {
72 let (lock, cv) = &*self.pending_snapshot;
73 let mut pending = lock
74 .lock()
75 .map_err(|_| "Amp config persistence lock is unavailable".to_string())?;
76 *pending = Some(snapshot);
77 cv.notify_one();
78 Ok(())
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use crate::infrastructure::audio_handler::MockAudioHandlerTrait;
86 use std::sync::{Arc, Condvar, Mutex};
87 use std::time::Duration;
88
89 struct SpyRepositoryState {
90 saved_configs: Mutex<Vec<AmpConfigDto>>,
91 saved_configs_cv: Condvar,
92 load_result: Mutex<Result<Option<AmpConfigDto>, String>>,
93 save_result: Mutex<Result<(), String>>,
94 save_started_count: Mutex<usize>,
95 save_started_cv: Condvar,
96 block_saves: Mutex<bool>,
97 block_saves_cv: Condvar,
98 }
99
100 impl SpyRepositoryState {
101 fn new() -> Self {
102 Self {
103 saved_configs: Mutex::new(Vec::new()),
104 saved_configs_cv: Condvar::new(),
105 load_result: Mutex::new(Ok(None)),
106 save_result: Mutex::new(Ok(())),
107 save_started_count: Mutex::new(0),
108 save_started_cv: Condvar::new(),
109 block_saves: Mutex::new(false),
110 block_saves_cv: Condvar::new(),
111 }
112 }
113
114 fn wait_for_saved_count(
115 &self,
116 minimum_count: usize,
117 timeout: Duration,
118 ) -> Vec<AmpConfigDto> {
119 let mut saved = self
120 .saved_configs
121 .lock()
122 .expect("saved_configs should be lockable");
123
124 let wait_result = self
125 .saved_configs_cv
126 .wait_timeout_while(saved, timeout, |configs| configs.len() < minimum_count)
127 .expect("saved_configs should remain lockable while waiting");
128 saved = wait_result.0;
129 saved.clone()
130 }
131
132 fn wait_for_save_started_count(&self, minimum_count: usize, timeout: Duration) {
133 let started = self
134 .save_started_count
135 .lock()
136 .expect("save_started_count should be lockable");
137
138 let _ = self
139 .save_started_cv
140 .wait_timeout_while(started, timeout, |count| *count < minimum_count)
141 .expect("save_started_count should remain lockable while waiting");
142 }
143
144 fn set_block_saves(&self, should_block: bool) {
145 let mut block_saves = self
146 .block_saves
147 .lock()
148 .expect("block_saves should be lockable");
149 *block_saves = should_block;
150 if !should_block {
151 self.block_saves_cv.notify_all();
152 }
153 }
154 }
155
156 struct SpyRepository {
157 state: Arc<SpyRepositoryState>,
158 }
159
160 impl AmpConfigPersistence for SpyRepository {
161 fn load(&self) -> Result<Option<AmpConfigDto>, String> {
162 self.state
163 .load_result
164 .lock()
165 .expect("load_result should be lockable")
166 .clone()
167 }
168
169 fn save(&self, config: &AmpConfigDto) -> Result<(), String> {
170 {
171 let mut started_count = self
172 .state
173 .save_started_count
174 .lock()
175 .expect("save_started_count should be lockable");
176 *started_count += 1;
177 self.state.save_started_cv.notify_all();
178 }
179
180 let mut block_saves = self
181 .state
182 .block_saves
183 .lock()
184 .expect("block_saves should be lockable");
185 while *block_saves {
186 block_saves = self
187 .state
188 .block_saves_cv
189 .wait(block_saves)
190 .expect("block_saves should remain lockable while waiting");
191 }
192 drop(block_saves);
193
194 self.state
195 .saved_configs
196 .lock()
197 .expect("saved_configs should be lockable")
198 .push(config.clone());
199 self.state.saved_configs_cv.notify_all();
200
201 self.state
202 .save_result
203 .lock()
204 .expect("save_result should be lockable")
205 .clone()
206 }
207 }
208
209 #[test]
210 fn load_amp_config_returns_repository_value() {
211 let state = Arc::new(SpyRepositoryState::new());
212 let expected = AmpConfigDto {
213 master_volume: 0.72,
214 is_active: false,
215 channels: Vec::new(),
216 current_channel: 2,
217 };
218 *state
219 .load_result
220 .lock()
221 .expect("load_result should be lockable") = Ok(Some(expected.clone()));
222
223 let service = AmpConfigPersistenceService::new(Box::new(SpyRepository {
224 state: Arc::clone(&state),
225 }));
226 let loaded = service.load_amp_config().expect("load should succeed");
227
228 assert!(loaded.is_some());
229 assert_eq!(
230 loaded.expect("value should be present").current_channel,
231 expected.current_channel
232 );
233 }
234
235 #[test]
236 fn load_amp_config_propagates_repository_error() {
237 let state = Arc::new(SpyRepositoryState::new());
238 *state
239 .load_result
240 .lock()
241 .expect("load_result should be lockable") = Err("load failed".to_string());
242
243 let service = AmpConfigPersistenceService::new(Box::new(SpyRepository {
244 state: Arc::clone(&state),
245 }));
246 let err = service.load_amp_config().expect_err("load should fail");
247
248 assert_eq!(err, "load failed");
249 }
250
251 #[test]
252 fn persist_from_audio_service_saves_snapshot() {
253 let state = Arc::new(SpyRepositoryState::new());
254 let service = AmpConfigPersistenceService::new(Box::new(SpyRepository {
255 state: Arc::clone(&state),
256 }));
257
258 let mock = MockAudioHandlerTrait::new();
259 let audio_service = AudioService::new_with_handler(Arc::new(mock));
260
261 service
262 .persist_from_audio_service(&audio_service)
263 .expect("persist should succeed");
264
265 let saved = state.wait_for_saved_count(1, Duration::from_secs(1));
266 assert_eq!(saved.len(), 1);
267 assert_eq!(saved[0].current_channel, 0);
268 assert!(!saved[0].is_active);
269 }
270
271 #[test]
272 fn persist_from_audio_service_enqueues_even_when_background_save_fails() {
273 let state = Arc::new(SpyRepositoryState::new());
274 *state
275 .save_result
276 .lock()
277 .expect("save_result should be lockable") = Err("save failed".to_string());
278
279 let service = AmpConfigPersistenceService::new(Box::new(SpyRepository {
280 state: Arc::clone(&state),
281 }));
282 let mock = MockAudioHandlerTrait::new();
283 let audio_service = AudioService::new_with_handler(Arc::new(mock));
284
285 service
286 .persist_from_audio_service(&audio_service)
287 .expect("enqueue should succeed");
288
289 let saved = state.wait_for_saved_count(1, Duration::from_secs(1));
290 assert_eq!(saved.len(), 1);
291 }
292
293 #[test]
294 fn persist_snapshot_keeps_only_newest_pending_snapshot() {
295 let state = Arc::new(SpyRepositoryState::new());
296 state.set_block_saves(true);
297
298 let service = AmpConfigPersistenceService::new(Box::new(SpyRepository {
299 state: Arc::clone(&state),
300 }));
301
302 let snapshot = |current_channel: u32| AmpConfigDto {
303 master_volume: 0.5,
304 is_active: false,
305 channels: Vec::new(),
306 current_channel,
307 };
308
309 service
310 .persist_snapshot(snapshot(1))
311 .expect("first snapshot enqueue should succeed");
312
313 state.wait_for_save_started_count(1, Duration::from_secs(1));
314
315 service
316 .persist_snapshot(snapshot(2))
317 .expect("second snapshot enqueue should succeed");
318 service
319 .persist_snapshot(snapshot(3))
320 .expect("third snapshot enqueue should succeed");
321
322 state.set_block_saves(false);
323
324 let saved = state.wait_for_saved_count(2, Duration::from_secs(1));
325
326 assert_eq!(saved.len(), 2);
327 assert_eq!(
328 saved
329 .first()
330 .expect("first snapshot exists")
331 .current_channel,
332 1
333 );
334 assert_eq!(
335 saved
336 .last()
337 .expect("at least one snapshot saved")
338 .current_channel,
339 3
340 );
341 assert!(saved.iter().any(|cfg| cfg.current_channel == 3));
342 assert!(!saved.iter().any(|cfg| cfg.current_channel == 2));
343 }
344}