1use crate::{
8 models::{
9 NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
10 },
11 repositories::*,
12};
13use async_trait::async_trait;
14use eyre::Result;
15use itertools::Itertools;
16use std::collections::HashMap;
17use tokio::sync::{Mutex, MutexGuard};
18
19#[derive(Debug)]
20pub struct InMemoryTransactionRepository {
21 store: Mutex<HashMap<String, TransactionRepoModel>>,
22}
23
24impl Clone for InMemoryTransactionRepository {
25 fn clone(&self) -> Self {
26 let data = self
28 .store
29 .try_lock()
30 .map(|guard| guard.clone())
31 .unwrap_or_else(|_| HashMap::new());
32
33 Self {
34 store: Mutex::new(data),
35 }
36 }
37}
38
39impl InMemoryTransactionRepository {
40 pub fn new() -> Self {
41 Self {
42 store: Mutex::new(HashMap::new()),
43 }
44 }
45
46 async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
47 Ok(lock.lock().await)
48 }
49}
50
51#[async_trait]
54impl Repository<TransactionRepoModel, String> for InMemoryTransactionRepository {
55 async fn create(
56 &self,
57 tx: TransactionRepoModel,
58 ) -> Result<TransactionRepoModel, RepositoryError> {
59 let mut store = Self::acquire_lock(&self.store).await?;
60 if store.contains_key(&tx.id) {
61 return Err(RepositoryError::ConstraintViolation(format!(
62 "Transaction with ID {} already exists",
63 tx.id
64 )));
65 }
66 store.insert(tx.id.clone(), tx.clone());
67 Ok(tx)
68 }
69
70 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
71 let store = Self::acquire_lock(&self.store).await?;
72 store.get(&id).cloned().ok_or_else(|| {
73 RepositoryError::NotFound(format!("Transaction with ID {} not found", id))
74 })
75 }
76
77 #[allow(clippy::map_entry)]
78 async fn update(
79 &self,
80 id: String,
81 tx: TransactionRepoModel,
82 ) -> Result<TransactionRepoModel, RepositoryError> {
83 let mut store = Self::acquire_lock(&self.store).await?;
84 if store.contains_key(&id) {
85 let mut updated_tx = tx;
86 updated_tx.id = id.clone();
87 store.insert(id, updated_tx.clone());
88 Ok(updated_tx)
89 } else {
90 Err(RepositoryError::NotFound(format!(
91 "Transaction with ID {} not found",
92 id
93 )))
94 }
95 }
96
97 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
98 let mut store = Self::acquire_lock(&self.store).await?;
99 if store.remove(&id).is_some() {
100 Ok(())
101 } else {
102 Err(RepositoryError::NotFound(format!(
103 "Transaction with ID {} not found",
104 id
105 )))
106 }
107 }
108
109 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
110 let store = Self::acquire_lock(&self.store).await?;
111 Ok(store.values().cloned().collect())
112 }
113
114 async fn list_paginated(
115 &self,
116 query: PaginationQuery,
117 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
118 let total = self.count().await?;
119 let start = ((query.page - 1) * query.per_page) as usize;
120 let store = Self::acquire_lock(&self.store).await?;
121 let items: Vec<TransactionRepoModel> = store
122 .values()
123 .skip(start)
124 .take(query.per_page as usize)
125 .cloned()
126 .collect();
127
128 Ok(PaginatedResult {
129 items,
130 total: total as u64,
131 page: query.page,
132 per_page: query.per_page,
133 })
134 }
135
136 async fn count(&self) -> Result<usize, RepositoryError> {
137 let store = Self::acquire_lock(&self.store).await?;
138 Ok(store.len())
139 }
140
141 async fn has_entries(&self) -> Result<bool, RepositoryError> {
142 let store = Self::acquire_lock(&self.store).await?;
143 Ok(!store.is_empty())
144 }
145
146 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
147 let mut store = Self::acquire_lock(&self.store).await?;
148 store.clear();
149 Ok(())
150 }
151}
152
153#[async_trait]
154impl TransactionRepository for InMemoryTransactionRepository {
155 async fn find_by_relayer_id(
156 &self,
157 relayer_id: &str,
158 query: PaginationQuery,
159 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
160 let store = Self::acquire_lock(&self.store).await?;
161 let filtered: Vec<TransactionRepoModel> = store
162 .values()
163 .filter(|tx| tx.relayer_id == relayer_id)
164 .cloned()
165 .collect();
166
167 let total = filtered.len() as u64;
168
169 if total == 0 {
170 return Ok(PaginatedResult::<TransactionRepoModel> {
171 items: vec![],
172 total: 0,
173 page: query.page,
174 per_page: query.per_page,
175 });
176 }
177
178 let start = ((query.page - 1) * query.per_page) as usize;
179
180 let items = filtered
182 .into_iter()
183 .sorted_by(|a, b| a.created_at.cmp(&b.created_at)) .skip(start)
185 .take(query.per_page as usize)
186 .collect();
187
188 Ok(PaginatedResult {
189 items,
190 total,
191 page: query.page,
192 per_page: query.per_page,
193 })
194 }
195
196 async fn find_by_status(
197 &self,
198 relayer_id: &str,
199 statuses: &[TransactionStatus],
200 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
201 let store = Self::acquire_lock(&self.store).await?;
202 let filtered: Vec<TransactionRepoModel> = store
203 .values()
204 .filter(|tx| tx.relayer_id == relayer_id && statuses.contains(&tx.status))
205 .cloned()
206 .collect();
207
208 let sorted = filtered
210 .into_iter()
211 .sorted_by_key(|tx| tx.created_at.clone())
212 .collect();
213
214 Ok(sorted)
215 }
216
217 async fn find_by_nonce(
218 &self,
219 relayer_id: &str,
220 nonce: u64,
221 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
222 let store = Self::acquire_lock(&self.store).await?;
223 let filtered: Vec<TransactionRepoModel> = store
224 .values()
225 .filter(|tx| {
226 tx.relayer_id == relayer_id
227 && match &tx.network_data {
228 NetworkTransactionData::Evm(data) => data.nonce == Some(nonce),
229 _ => false,
230 }
231 })
232 .cloned()
233 .collect();
234
235 Ok(filtered.into_iter().next())
236 }
237
238 async fn update_status(
239 &self,
240 tx_id: String,
241 status: TransactionStatus,
242 ) -> Result<TransactionRepoModel, RepositoryError> {
243 let update = TransactionUpdateRequest {
244 status: Some(status),
245 ..Default::default()
246 };
247 self.partial_update(tx_id, update).await
248 }
249
250 async fn partial_update(
251 &self,
252 tx_id: String,
253 update: TransactionUpdateRequest,
254 ) -> Result<TransactionRepoModel, RepositoryError> {
255 let mut store = Self::acquire_lock(&self.store).await?;
256
257 if let Some(tx) = store.get_mut(&tx_id) {
258 tx.apply_partial_update(update);
260 Ok(tx.clone())
261 } else {
262 Err(RepositoryError::NotFound(format!(
263 "Transaction with ID {} not found",
264 tx_id
265 )))
266 }
267 }
268
269 async fn update_network_data(
270 &self,
271 tx_id: String,
272 network_data: NetworkTransactionData,
273 ) -> Result<TransactionRepoModel, RepositoryError> {
274 let mut tx = self.get_by_id(tx_id.clone()).await?;
275 tx.network_data = network_data;
276 self.update(tx_id, tx).await
277 }
278
279 async fn set_sent_at(
280 &self,
281 tx_id: String,
282 sent_at: String,
283 ) -> Result<TransactionRepoModel, RepositoryError> {
284 let mut tx = self.get_by_id(tx_id.clone()).await?;
285 tx.sent_at = Some(sent_at);
286 self.update(tx_id, tx).await
287 }
288
289 async fn set_confirmed_at(
290 &self,
291 tx_id: String,
292 confirmed_at: String,
293 ) -> Result<TransactionRepoModel, RepositoryError> {
294 let mut tx = self.get_by_id(tx_id.clone()).await?;
295 tx.confirmed_at = Some(confirmed_at);
296 self.update(tx_id, tx).await
297 }
298}
299
300impl Default for InMemoryTransactionRepository {
301 fn default() -> Self {
302 Self::new()
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
309 use lazy_static::lazy_static;
310 use std::str::FromStr;
311
312 use crate::models::U256;
313
314 use super::*;
315
316 use tokio::sync::Mutex;
317
318 lazy_static! {
319 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
320 }
321 fn create_test_transaction(id: &str) -> TransactionRepoModel {
323 TransactionRepoModel {
324 id: id.to_string(),
325 relayer_id: "relayer-1".to_string(),
326 status: TransactionStatus::Pending,
327 status_reason: None,
328 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
329 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
330 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
331 valid_until: None,
332 delete_at: None,
333 network_type: NetworkType::Evm,
334 priced_at: None,
335 hashes: vec![],
336 network_data: NetworkTransactionData::Evm(EvmTransactionData {
337 gas_price: Some(1000000000),
338 gas_limit: Some(21000),
339 nonce: Some(1),
340 value: U256::from_str("1000000000000000000").unwrap(),
341 data: Some("0x".to_string()),
342 from: "0xSender".to_string(),
343 to: Some("0xRecipient".to_string()),
344 chain_id: 1,
345 signature: None,
346 hash: Some(format!("0x{}", id)),
347 speed: Some(Speed::Fast),
348 max_fee_per_gas: None,
349 max_priority_fee_per_gas: None,
350 raw: None,
351 }),
352 noop_count: None,
353 is_canceled: Some(false),
354 }
355 }
356
357 fn create_test_transaction_pending_state(id: &str) -> TransactionRepoModel {
358 TransactionRepoModel {
359 id: id.to_string(),
360 relayer_id: "relayer-1".to_string(),
361 status: TransactionStatus::Pending,
362 status_reason: None,
363 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
364 sent_at: None,
365 confirmed_at: None,
366 valid_until: None,
367 delete_at: None,
368 network_type: NetworkType::Evm,
369 priced_at: None,
370 hashes: vec![],
371 network_data: NetworkTransactionData::Evm(EvmTransactionData {
372 gas_price: Some(1000000000),
373 gas_limit: Some(21000),
374 nonce: Some(1),
375 value: U256::from_str("1000000000000000000").unwrap(),
376 data: Some("0x".to_string()),
377 from: "0xSender".to_string(),
378 to: Some("0xRecipient".to_string()),
379 chain_id: 1,
380 signature: None,
381 hash: Some(format!("0x{}", id)),
382 speed: Some(Speed::Fast),
383 max_fee_per_gas: None,
384 max_priority_fee_per_gas: None,
385 raw: None,
386 }),
387 noop_count: None,
388 is_canceled: Some(false),
389 }
390 }
391
392 #[tokio::test]
393 async fn test_create_transaction() {
394 let repo = InMemoryTransactionRepository::new();
395 let tx = create_test_transaction("test-1");
396
397 let result = repo.create(tx.clone()).await.unwrap();
398 assert_eq!(result.id, tx.id);
399 assert_eq!(repo.count().await.unwrap(), 1);
400 }
401
402 #[tokio::test]
403 async fn test_get_transaction() {
404 let repo = InMemoryTransactionRepository::new();
405 let tx = create_test_transaction("test-1");
406
407 repo.create(tx.clone()).await.unwrap();
408 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
409 if let NetworkTransactionData::Evm(stored_data) = &stored.network_data {
410 if let NetworkTransactionData::Evm(tx_data) = &tx.network_data {
411 assert_eq!(stored_data.hash, tx_data.hash);
412 }
413 }
414 }
415
416 #[tokio::test]
417 async fn test_update_transaction() {
418 let repo = InMemoryTransactionRepository::new();
419 let mut tx = create_test_transaction("test-1");
420
421 repo.create(tx.clone()).await.unwrap();
422 tx.status = TransactionStatus::Confirmed;
423
424 let updated = repo.update("test-1".to_string(), tx).await.unwrap();
425 assert!(matches!(updated.status, TransactionStatus::Confirmed));
426 }
427
428 #[tokio::test]
429 async fn test_delete_transaction() {
430 let repo = InMemoryTransactionRepository::new();
431 let tx = create_test_transaction("test-1");
432
433 repo.create(tx).await.unwrap();
434 repo.delete_by_id("test-1".to_string()).await.unwrap();
435
436 let result = repo.get_by_id("test-1".to_string()).await;
437 assert!(result.is_err());
438 }
439
440 #[tokio::test]
441 async fn test_list_all_transactions() {
442 let repo = InMemoryTransactionRepository::new();
443 let tx1 = create_test_transaction("test-1");
444 let tx2 = create_test_transaction("test-2");
445
446 repo.create(tx1).await.unwrap();
447 repo.create(tx2).await.unwrap();
448
449 let transactions = repo.list_all().await.unwrap();
450 assert_eq!(transactions.len(), 2);
451 }
452
453 #[tokio::test]
454 async fn test_count_transactions() {
455 let repo = InMemoryTransactionRepository::new();
456 let tx = create_test_transaction("test-1");
457
458 assert_eq!(repo.count().await.unwrap(), 0);
459 repo.create(tx).await.unwrap();
460 assert_eq!(repo.count().await.unwrap(), 1);
461 }
462
463 #[tokio::test]
464 async fn test_get_nonexistent_transaction() {
465 let repo = InMemoryTransactionRepository::new();
466 let result = repo.get_by_id("nonexistent".to_string()).await;
467 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
468 }
469
470 #[tokio::test]
471 async fn test_duplicate_transaction_creation() {
472 let repo = InMemoryTransactionRepository::new();
473 let tx = create_test_transaction("test-1");
474
475 repo.create(tx.clone()).await.unwrap();
476 let result = repo.create(tx).await;
477
478 assert!(matches!(
479 result,
480 Err(RepositoryError::ConstraintViolation(_))
481 ));
482 }
483
484 #[tokio::test]
485 async fn test_update_nonexistent_transaction() {
486 let repo = InMemoryTransactionRepository::new();
487 let tx = create_test_transaction("test-1");
488
489 let result = repo.update("nonexistent".to_string(), tx).await;
490 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
491 }
492
493 #[tokio::test]
494 async fn test_partial_update() {
495 let repo = InMemoryTransactionRepository::new();
496 let tx = create_test_transaction_pending_state("test-tx-id");
497 repo.create(tx.clone()).await.unwrap();
498
499 let update1 = TransactionUpdateRequest {
501 status: Some(TransactionStatus::Sent),
502 status_reason: None,
503 sent_at: None,
504 confirmed_at: None,
505 network_data: None,
506 hashes: None,
507 priced_at: None,
508 noop_count: None,
509 is_canceled: None,
510 delete_at: None,
511 };
512 let updated_tx1 = repo
513 .partial_update("test-tx-id".to_string(), update1)
514 .await
515 .unwrap();
516 assert_eq!(updated_tx1.status, TransactionStatus::Sent);
517 assert_eq!(updated_tx1.sent_at, None);
518
519 let update2 = TransactionUpdateRequest {
521 status: Some(TransactionStatus::Confirmed),
522 status_reason: None,
523 sent_at: Some("2023-01-01T12:00:00Z".to_string()),
524 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
525 network_data: None,
526 hashes: None,
527 priced_at: None,
528 noop_count: None,
529 is_canceled: None,
530 delete_at: None,
531 };
532 let updated_tx2 = repo
533 .partial_update("test-tx-id".to_string(), update2)
534 .await
535 .unwrap();
536 assert_eq!(updated_tx2.status, TransactionStatus::Confirmed);
537 assert_eq!(
538 updated_tx2.sent_at,
539 Some("2023-01-01T12:00:00Z".to_string())
540 );
541 assert_eq!(
542 updated_tx2.confirmed_at,
543 Some("2023-01-01T12:05:00Z".to_string())
544 );
545
546 let update3 = TransactionUpdateRequest {
548 status: Some(TransactionStatus::Failed),
549 status_reason: None,
550 sent_at: None,
551 confirmed_at: None,
552 network_data: None,
553 hashes: None,
554 priced_at: None,
555 noop_count: None,
556 is_canceled: None,
557 delete_at: None,
558 };
559 let result = repo
560 .partial_update("non-existent-id".to_string(), update3)
561 .await;
562 assert!(result.is_err());
563 assert!(matches!(result.unwrap_err(), RepositoryError::NotFound(_)));
564 }
565
566 #[tokio::test]
567 async fn test_update_status() {
568 let repo = InMemoryTransactionRepository::new();
569 let tx = create_test_transaction("test-1");
570
571 repo.create(tx).await.unwrap();
572
573 let updated = repo
575 .update_status("test-1".to_string(), TransactionStatus::Confirmed)
576 .await
577 .unwrap();
578
579 assert_eq!(updated.status, TransactionStatus::Confirmed);
581
582 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
584 assert_eq!(stored.status, TransactionStatus::Confirmed);
585
586 let updated = repo
588 .update_status("test-1".to_string(), TransactionStatus::Failed)
589 .await
590 .unwrap();
591
592 assert_eq!(updated.status, TransactionStatus::Failed);
594
595 let result = repo
597 .update_status("non-existent".to_string(), TransactionStatus::Confirmed)
598 .await;
599 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
600 }
601
602 #[tokio::test]
603 async fn test_list_paginated() {
604 let repo = InMemoryTransactionRepository::new();
605
606 for i in 1..=10 {
608 let tx = create_test_transaction(&format!("test-{}", i));
609 repo.create(tx).await.unwrap();
610 }
611
612 let query = PaginationQuery {
614 page: 1,
615 per_page: 3,
616 };
617 let result = repo.list_paginated(query).await.unwrap();
618 assert_eq!(result.items.len(), 3);
619 assert_eq!(result.total, 10);
620 assert_eq!(result.page, 1);
621 assert_eq!(result.per_page, 3);
622
623 let query = PaginationQuery {
625 page: 2,
626 per_page: 3,
627 };
628 let result = repo.list_paginated(query).await.unwrap();
629 assert_eq!(result.items.len(), 3);
630 assert_eq!(result.total, 10);
631 assert_eq!(result.page, 2);
632 assert_eq!(result.per_page, 3);
633
634 let query = PaginationQuery {
636 page: 4,
637 per_page: 3,
638 };
639 let result = repo.list_paginated(query).await.unwrap();
640 assert_eq!(result.items.len(), 1);
641 assert_eq!(result.total, 10);
642 assert_eq!(result.page, 4);
643 assert_eq!(result.per_page, 3);
644
645 let query = PaginationQuery {
647 page: 5,
648 per_page: 3,
649 };
650 let result = repo.list_paginated(query).await.unwrap();
651 assert_eq!(result.items.len(), 0);
652 assert_eq!(result.total, 10);
653 }
654
655 #[tokio::test]
656 async fn test_find_by_nonce() {
657 let repo = InMemoryTransactionRepository::new();
658
659 let tx1 = create_test_transaction("test-1");
661
662 let mut tx2 = create_test_transaction("test-2");
663 if let NetworkTransactionData::Evm(ref mut data) = tx2.network_data {
664 data.nonce = Some(2);
665 }
666
667 let mut tx3 = create_test_transaction("test-3");
668 tx3.relayer_id = "relayer-2".to_string();
669 if let NetworkTransactionData::Evm(ref mut data) = tx3.network_data {
670 data.nonce = Some(1);
671 }
672
673 repo.create(tx1).await.unwrap();
674 repo.create(tx2).await.unwrap();
675 repo.create(tx3).await.unwrap();
676
677 let result = repo.find_by_nonce("relayer-1", 1).await.unwrap();
679 assert!(result.is_some());
680 assert_eq!(result.as_ref().unwrap().id, "test-1");
681
682 let result = repo.find_by_nonce("relayer-1", 2).await.unwrap();
684 assert!(result.is_some());
685 assert_eq!(result.as_ref().unwrap().id, "test-2");
686
687 let result = repo.find_by_nonce("relayer-2", 1).await.unwrap();
689 assert!(result.is_some());
690 assert_eq!(result.as_ref().unwrap().id, "test-3");
691
692 let result = repo.find_by_nonce("relayer-1", 99).await.unwrap();
694 assert!(result.is_none());
695 }
696
697 #[tokio::test]
698 async fn test_update_network_data() {
699 let repo = InMemoryTransactionRepository::new();
700 let tx = create_test_transaction("test-1");
701
702 repo.create(tx.clone()).await.unwrap();
703
704 let updated_network_data = NetworkTransactionData::Evm(EvmTransactionData {
706 gas_price: Some(2000000000),
707 gas_limit: Some(30000),
708 nonce: Some(2),
709 value: U256::from_str("2000000000000000000").unwrap(),
710 data: Some("0xUpdated".to_string()),
711 from: "0xSender".to_string(),
712 to: Some("0xRecipient".to_string()),
713 chain_id: 1,
714 signature: None,
715 hash: Some("0xUpdated".to_string()),
716 raw: None,
717 speed: None,
718 max_fee_per_gas: None,
719 max_priority_fee_per_gas: None,
720 });
721
722 let updated = repo
723 .update_network_data("test-1".to_string(), updated_network_data)
724 .await
725 .unwrap();
726
727 if let NetworkTransactionData::Evm(data) = &updated.network_data {
729 assert_eq!(data.gas_price, Some(2000000000));
730 assert_eq!(data.gas_limit, Some(30000));
731 assert_eq!(data.nonce, Some(2));
732 assert_eq!(data.hash, Some("0xUpdated".to_string()));
733 assert_eq!(data.data, Some("0xUpdated".to_string()));
734 } else {
735 panic!("Expected EVM network data");
736 }
737 }
738
739 #[tokio::test]
740 async fn test_set_sent_at() {
741 let repo = InMemoryTransactionRepository::new();
742 let tx = create_test_transaction("test-1");
743
744 repo.create(tx).await.unwrap();
745
746 let new_sent_at = "2025-02-01T10:00:00.000000+00:00".to_string();
748
749 let updated = repo
750 .set_sent_at("test-1".to_string(), new_sent_at.clone())
751 .await
752 .unwrap();
753
754 assert_eq!(updated.sent_at, Some(new_sent_at.clone()));
756
757 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
759 assert_eq!(stored.sent_at, Some(new_sent_at.clone()));
760 }
761
762 #[tokio::test]
763 async fn test_set_confirmed_at() {
764 let repo = InMemoryTransactionRepository::new();
765 let tx = create_test_transaction("test-1");
766
767 repo.create(tx).await.unwrap();
768
769 let new_confirmed_at = "2025-02-01T11:30:45.123456+00:00".to_string();
771
772 let updated = repo
773 .set_confirmed_at("test-1".to_string(), new_confirmed_at.clone())
774 .await
775 .unwrap();
776
777 assert_eq!(updated.confirmed_at, Some(new_confirmed_at.clone()));
779
780 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
782 assert_eq!(stored.confirmed_at, Some(new_confirmed_at.clone()));
783 }
784
785 #[tokio::test]
786 async fn test_find_by_relayer_id() {
787 let repo = InMemoryTransactionRepository::new();
788 let tx1 = create_test_transaction("test-1");
789 let tx2 = create_test_transaction("test-2");
790
791 let mut tx3 = create_test_transaction("test-3");
793 tx3.relayer_id = "relayer-2".to_string();
794
795 repo.create(tx1).await.unwrap();
796 repo.create(tx2).await.unwrap();
797 repo.create(tx3).await.unwrap();
798
799 let query = PaginationQuery {
801 page: 1,
802 per_page: 10,
803 };
804 let result = repo
805 .find_by_relayer_id("relayer-1", query.clone())
806 .await
807 .unwrap();
808 assert_eq!(result.total, 2);
809 assert_eq!(result.items.len(), 2);
810 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
811
812 let result = repo
814 .find_by_relayer_id("relayer-2", query.clone())
815 .await
816 .unwrap();
817 assert_eq!(result.total, 1);
818 assert_eq!(result.items.len(), 1);
819 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
820
821 let result = repo
823 .find_by_relayer_id("non-existent", query.clone())
824 .await
825 .unwrap();
826 assert_eq!(result.total, 0);
827 assert_eq!(result.items.len(), 0);
828 }
829
830 #[tokio::test]
831 async fn test_find_by_status() {
832 let repo = InMemoryTransactionRepository::new();
833 let tx1 = create_test_transaction_pending_state("tx1");
834 let mut tx2 = create_test_transaction_pending_state("tx2");
835 tx2.status = TransactionStatus::Submitted;
836 let mut tx3 = create_test_transaction_pending_state("tx3");
837 tx3.relayer_id = "relayer-2".to_string();
838 tx3.status = TransactionStatus::Pending;
839
840 repo.create(tx1.clone()).await.unwrap();
841 repo.create(tx2.clone()).await.unwrap();
842 repo.create(tx3.clone()).await.unwrap();
843
844 let pending_txs = repo
846 .find_by_status("relayer-1", &[TransactionStatus::Pending])
847 .await
848 .unwrap();
849 assert_eq!(pending_txs.len(), 1);
850 assert_eq!(pending_txs[0].id, "tx1");
851
852 let submitted_txs = repo
853 .find_by_status("relayer-1", &[TransactionStatus::Submitted])
854 .await
855 .unwrap();
856 assert_eq!(submitted_txs.len(), 1);
857 assert_eq!(submitted_txs[0].id, "tx2");
858
859 let multiple_status_txs = repo
861 .find_by_status(
862 "relayer-1",
863 &[TransactionStatus::Pending, TransactionStatus::Submitted],
864 )
865 .await
866 .unwrap();
867 assert_eq!(multiple_status_txs.len(), 2);
868
869 let relayer2_pending = repo
871 .find_by_status("relayer-2", &[TransactionStatus::Pending])
872 .await
873 .unwrap();
874 assert_eq!(relayer2_pending.len(), 1);
875 assert_eq!(relayer2_pending[0].id, "tx3");
876
877 let no_txs = repo
879 .find_by_status("non-existent", &[TransactionStatus::Pending])
880 .await
881 .unwrap();
882 assert_eq!(no_txs.len(), 0);
883 }
884
885 #[tokio::test]
886 async fn test_find_by_status_sorted_by_created_at() {
887 let repo = InMemoryTransactionRepository::new();
888
889 let create_tx_with_timestamp = |id: &str, timestamp: &str| -> TransactionRepoModel {
891 let mut tx = create_test_transaction_pending_state(id);
892 tx.created_at = timestamp.to_string();
893 tx.status = TransactionStatus::Pending;
894 tx
895 };
896
897 let tx3 = create_tx_with_timestamp("tx3", "2025-01-27T17:00:00.000000+00:00"); let tx1 = create_tx_with_timestamp("tx1", "2025-01-27T15:00:00.000000+00:00"); let tx2 = create_tx_with_timestamp("tx2", "2025-01-27T16:00:00.000000+00:00"); repo.create(tx3.clone()).await.unwrap();
904 repo.create(tx1.clone()).await.unwrap();
905 repo.create(tx2.clone()).await.unwrap();
906
907 let result = repo
909 .find_by_status("relayer-1", &[TransactionStatus::Pending])
910 .await
911 .unwrap();
912
913 assert_eq!(result.len(), 3);
915 assert_eq!(result[0].id, "tx1"); assert_eq!(result[1].id, "tx2"); assert_eq!(result[2].id, "tx3"); assert_eq!(result[0].created_at, "2025-01-27T15:00:00.000000+00:00");
921 assert_eq!(result[1].created_at, "2025-01-27T16:00:00.000000+00:00");
922 assert_eq!(result[2].created_at, "2025-01-27T17:00:00.000000+00:00");
923 }
924
925 #[tokio::test]
926 async fn test_has_entries() {
927 let repo = InMemoryTransactionRepository::new();
928 assert!(!repo.has_entries().await.unwrap());
929
930 let tx = create_test_transaction("test");
931 repo.create(tx.clone()).await.unwrap();
932
933 assert!(repo.has_entries().await.unwrap());
934 }
935
936 #[tokio::test]
937 async fn test_drop_all_entries() {
938 let repo = InMemoryTransactionRepository::new();
939 let tx = create_test_transaction("test");
940 repo.create(tx.clone()).await.unwrap();
941
942 assert!(repo.has_entries().await.unwrap());
943
944 repo.drop_all_entries().await.unwrap();
945 assert!(!repo.has_entries().await.unwrap());
946 }
947
948 #[tokio::test]
951 async fn test_update_status_sets_delete_at_for_final_statuses() {
952 let _lock = ENV_MUTEX.lock().await;
953
954 use chrono::{DateTime, Duration, Utc};
955 use std::env;
956
957 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
959
960 let repo = InMemoryTransactionRepository::new();
961
962 let final_statuses = [
963 TransactionStatus::Canceled,
964 TransactionStatus::Confirmed,
965 TransactionStatus::Failed,
966 TransactionStatus::Expired,
967 ];
968
969 for (i, status) in final_statuses.iter().enumerate() {
970 let tx_id = format!("test-final-{}", i);
971 let tx = create_test_transaction_pending_state(&tx_id);
972
973 assert!(tx.delete_at.is_none());
975
976 repo.create(tx).await.unwrap();
977
978 let before_update = Utc::now();
979
980 let updated = repo
982 .update_status(tx_id.clone(), status.clone())
983 .await
984 .unwrap();
985
986 assert!(
988 updated.delete_at.is_some(),
989 "delete_at should be set for status: {:?}",
990 status
991 );
992
993 let delete_at_str = updated.delete_at.unwrap();
995 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
996 .expect("delete_at should be valid RFC3339")
997 .with_timezone(&Utc);
998
999 let duration_from_before = delete_at.signed_duration_since(before_update);
1000 let expected_duration = Duration::hours(6);
1001 let tolerance = Duration::minutes(5);
1002
1003 assert!(
1004 duration_from_before >= expected_duration - tolerance &&
1005 duration_from_before <= expected_duration + tolerance,
1006 "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1007 status, duration_from_before
1008 );
1009 }
1010
1011 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1013 }
1014
1015 #[tokio::test]
1016 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1017 let _lock = ENV_MUTEX.lock().await;
1018
1019 use std::env;
1020
1021 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1022
1023 let repo = InMemoryTransactionRepository::new();
1024
1025 let non_final_statuses = [
1026 TransactionStatus::Pending,
1027 TransactionStatus::Sent,
1028 TransactionStatus::Submitted,
1029 TransactionStatus::Mined,
1030 ];
1031
1032 for (i, status) in non_final_statuses.iter().enumerate() {
1033 let tx_id = format!("test-non-final-{}", i);
1034 let tx = create_test_transaction_pending_state(&tx_id);
1035
1036 repo.create(tx).await.unwrap();
1037
1038 let updated = repo
1040 .update_status(tx_id.clone(), status.clone())
1041 .await
1042 .unwrap();
1043
1044 assert!(
1046 updated.delete_at.is_none(),
1047 "delete_at should NOT be set for status: {:?}",
1048 status
1049 );
1050 }
1051
1052 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1054 }
1055
1056 #[tokio::test]
1057 async fn test_partial_update_sets_delete_at_for_final_statuses() {
1058 let _lock = ENV_MUTEX.lock().await;
1059
1060 use chrono::{DateTime, Duration, Utc};
1061 use std::env;
1062
1063 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1064
1065 let repo = InMemoryTransactionRepository::new();
1066 let tx = create_test_transaction_pending_state("test-partial-final");
1067
1068 repo.create(tx).await.unwrap();
1069
1070 let before_update = Utc::now();
1071
1072 let update = TransactionUpdateRequest {
1074 status: Some(TransactionStatus::Confirmed),
1075 status_reason: Some("Transaction completed".to_string()),
1076 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1077 ..Default::default()
1078 };
1079
1080 let updated = repo
1081 .partial_update("test-partial-final".to_string(), update)
1082 .await
1083 .unwrap();
1084
1085 assert!(
1087 updated.delete_at.is_some(),
1088 "delete_at should be set when updating to Confirmed status"
1089 );
1090
1091 let delete_at_str = updated.delete_at.unwrap();
1093 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1094 .expect("delete_at should be valid RFC3339")
1095 .with_timezone(&Utc);
1096
1097 let duration_from_before = delete_at.signed_duration_since(before_update);
1098 let expected_duration = Duration::hours(8);
1099 let tolerance = Duration::minutes(5);
1100
1101 assert!(
1102 duration_from_before >= expected_duration - tolerance
1103 && duration_from_before <= expected_duration + tolerance,
1104 "delete_at should be approximately 8 hours from now. Duration: {:?}",
1105 duration_from_before
1106 );
1107
1108 assert_eq!(updated.status, TransactionStatus::Confirmed);
1110 assert_eq!(
1111 updated.status_reason,
1112 Some("Transaction completed".to_string())
1113 );
1114 assert_eq!(
1115 updated.confirmed_at,
1116 Some("2023-01-01T12:05:00Z".to_string())
1117 );
1118
1119 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1121 }
1122
1123 #[tokio::test]
1124 async fn test_update_status_preserves_existing_delete_at() {
1125 let _lock = ENV_MUTEX.lock().await;
1126
1127 use std::env;
1128
1129 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1130
1131 let repo = InMemoryTransactionRepository::new();
1132 let mut tx = create_test_transaction_pending_state("test-preserve-delete-at");
1133
1134 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1136 tx.delete_at = Some(existing_delete_at.clone());
1137
1138 repo.create(tx).await.unwrap();
1139
1140 let updated = repo
1142 .update_status(
1143 "test-preserve-delete-at".to_string(),
1144 TransactionStatus::Confirmed,
1145 )
1146 .await
1147 .unwrap();
1148
1149 assert_eq!(
1151 updated.delete_at,
1152 Some(existing_delete_at),
1153 "Existing delete_at should be preserved when updating to final status"
1154 );
1155
1156 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1158 }
1159
1160 #[tokio::test]
1161 async fn test_partial_update_without_status_change_preserves_delete_at() {
1162 let _lock = ENV_MUTEX.lock().await;
1163
1164 use std::env;
1165
1166 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1167
1168 let repo = InMemoryTransactionRepository::new();
1169 let tx = create_test_transaction_pending_state("test-preserve-no-status");
1170
1171 repo.create(tx).await.unwrap();
1172
1173 let updated1 = repo
1175 .update_status(
1176 "test-preserve-no-status".to_string(),
1177 TransactionStatus::Confirmed,
1178 )
1179 .await
1180 .unwrap();
1181
1182 assert!(updated1.delete_at.is_some());
1183 let original_delete_at = updated1.delete_at.clone();
1184
1185 let update = TransactionUpdateRequest {
1187 status: None, status_reason: Some("Updated reason".to_string()),
1189 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1190 ..Default::default()
1191 };
1192
1193 let updated2 = repo
1194 .partial_update("test-preserve-no-status".to_string(), update)
1195 .await
1196 .unwrap();
1197
1198 assert_eq!(
1200 updated2.delete_at, original_delete_at,
1201 "delete_at should be preserved when status is not updated"
1202 );
1203
1204 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1207 assert_eq!(
1208 updated2.confirmed_at,
1209 Some("2023-01-01T12:10:00Z".to_string())
1210 );
1211
1212 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1214 }
1215
1216 #[tokio::test]
1217 async fn test_update_status_multiple_updates_idempotent() {
1218 let _lock = ENV_MUTEX.lock().await;
1219
1220 use std::env;
1221
1222 env::set_var("TRANSACTION_EXPIRATION_HOURS", "12");
1223
1224 let repo = InMemoryTransactionRepository::new();
1225 let tx = create_test_transaction_pending_state("test-idempotent");
1226
1227 repo.create(tx).await.unwrap();
1228
1229 let updated1 = repo
1231 .update_status("test-idempotent".to_string(), TransactionStatus::Confirmed)
1232 .await
1233 .unwrap();
1234
1235 assert!(updated1.delete_at.is_some());
1236 let first_delete_at = updated1.delete_at.clone();
1237
1238 let updated2 = repo
1240 .update_status("test-idempotent".to_string(), TransactionStatus::Failed)
1241 .await
1242 .unwrap();
1243
1244 assert_eq!(
1246 updated2.delete_at, first_delete_at,
1247 "delete_at should not change on subsequent final status updates"
1248 );
1249
1250 assert_eq!(updated2.status, TransactionStatus::Failed);
1252
1253 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1255 }
1256}