openzeppelin_relayer/repositories/transaction/
transaction_in_memory.rs

1//! This module defines an in-memory transaction repository for managing
2//! transaction data. It provides asynchronous methods for creating, retrieving,
3//! updating, and deleting transactions, as well as querying transactions by
4//! various criteria such as relayer ID, status, and nonce. The repository
5//! is implemented using a `Mutex`-protected `HashMap` to store transaction
6//! data, ensuring thread-safe access in an asynchronous context.
7use crate::{
8    models::{
9        NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
10    },
11    repositories::*,
12};
13use async_trait::async_trait;
14use eyre::Result;
15use itertools::Itertools;
16use std::collections::HashMap;
17use tokio::sync::{Mutex, MutexGuard};
18
19#[derive(Debug)]
20pub struct InMemoryTransactionRepository {
21    store: Mutex<HashMap<String, TransactionRepoModel>>,
22}
23
24impl Clone for InMemoryTransactionRepository {
25    fn clone(&self) -> Self {
26        // Try to get the current data, or use empty HashMap if lock fails
27        let data = self
28            .store
29            .try_lock()
30            .map(|guard| guard.clone())
31            .unwrap_or_else(|_| HashMap::new());
32
33        Self {
34            store: Mutex::new(data),
35        }
36    }
37}
38
39impl InMemoryTransactionRepository {
40    pub fn new() -> Self {
41        Self {
42            store: Mutex::new(HashMap::new()),
43        }
44    }
45
46    async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
47        Ok(lock.lock().await)
48    }
49}
50
51// Implement both traits for InMemoryTransactionRepository
52
53#[async_trait]
54impl Repository<TransactionRepoModel, String> for InMemoryTransactionRepository {
55    async fn create(
56        &self,
57        tx: TransactionRepoModel,
58    ) -> Result<TransactionRepoModel, RepositoryError> {
59        let mut store = Self::acquire_lock(&self.store).await?;
60        if store.contains_key(&tx.id) {
61            return Err(RepositoryError::ConstraintViolation(format!(
62                "Transaction with ID {} already exists",
63                tx.id
64            )));
65        }
66        store.insert(tx.id.clone(), tx.clone());
67        Ok(tx)
68    }
69
70    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
71        let store = Self::acquire_lock(&self.store).await?;
72        store.get(&id).cloned().ok_or_else(|| {
73            RepositoryError::NotFound(format!("Transaction with ID {} not found", id))
74        })
75    }
76
77    #[allow(clippy::map_entry)]
78    async fn update(
79        &self,
80        id: String,
81        tx: TransactionRepoModel,
82    ) -> Result<TransactionRepoModel, RepositoryError> {
83        let mut store = Self::acquire_lock(&self.store).await?;
84        if store.contains_key(&id) {
85            let mut updated_tx = tx;
86            updated_tx.id = id.clone();
87            store.insert(id, updated_tx.clone());
88            Ok(updated_tx)
89        } else {
90            Err(RepositoryError::NotFound(format!(
91                "Transaction with ID {} not found",
92                id
93            )))
94        }
95    }
96
97    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
98        let mut store = Self::acquire_lock(&self.store).await?;
99        if store.remove(&id).is_some() {
100            Ok(())
101        } else {
102            Err(RepositoryError::NotFound(format!(
103                "Transaction with ID {} not found",
104                id
105            )))
106        }
107    }
108
109    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
110        let store = Self::acquire_lock(&self.store).await?;
111        Ok(store.values().cloned().collect())
112    }
113
114    async fn list_paginated(
115        &self,
116        query: PaginationQuery,
117    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
118        let total = self.count().await?;
119        let start = ((query.page - 1) * query.per_page) as usize;
120        let store = Self::acquire_lock(&self.store).await?;
121        let items: Vec<TransactionRepoModel> = store
122            .values()
123            .skip(start)
124            .take(query.per_page as usize)
125            .cloned()
126            .collect();
127
128        Ok(PaginatedResult {
129            items,
130            total: total as u64,
131            page: query.page,
132            per_page: query.per_page,
133        })
134    }
135
136    async fn count(&self) -> Result<usize, RepositoryError> {
137        let store = Self::acquire_lock(&self.store).await?;
138        Ok(store.len())
139    }
140
141    async fn has_entries(&self) -> Result<bool, RepositoryError> {
142        let store = Self::acquire_lock(&self.store).await?;
143        Ok(!store.is_empty())
144    }
145
146    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
147        let mut store = Self::acquire_lock(&self.store).await?;
148        store.clear();
149        Ok(())
150    }
151}
152
153#[async_trait]
154impl TransactionRepository for InMemoryTransactionRepository {
155    async fn find_by_relayer_id(
156        &self,
157        relayer_id: &str,
158        query: PaginationQuery,
159    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
160        let store = Self::acquire_lock(&self.store).await?;
161        let filtered: Vec<TransactionRepoModel> = store
162            .values()
163            .filter(|tx| tx.relayer_id == relayer_id)
164            .cloned()
165            .collect();
166
167        let total = filtered.len() as u64;
168
169        if total == 0 {
170            return Ok(PaginatedResult::<TransactionRepoModel> {
171                items: vec![],
172                total: 0,
173                page: query.page,
174                per_page: query.per_page,
175            });
176        }
177
178        let start = ((query.page - 1) * query.per_page) as usize;
179
180        // Sort and paginate
181        let items = filtered
182            .into_iter()
183            .sorted_by(|a, b| a.created_at.cmp(&b.created_at)) // Sort by created_at
184            .skip(start)
185            .take(query.per_page as usize)
186            .collect();
187
188        Ok(PaginatedResult {
189            items,
190            total,
191            page: query.page,
192            per_page: query.per_page,
193        })
194    }
195
196    async fn find_by_status(
197        &self,
198        relayer_id: &str,
199        statuses: &[TransactionStatus],
200    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
201        let store = Self::acquire_lock(&self.store).await?;
202        let filtered: Vec<TransactionRepoModel> = store
203            .values()
204            .filter(|tx| tx.relayer_id == relayer_id && statuses.contains(&tx.status))
205            .cloned()
206            .collect();
207
208        // Sort by created_at (oldest first)
209        let sorted = filtered
210            .into_iter()
211            .sorted_by_key(|tx| tx.created_at.clone())
212            .collect();
213
214        Ok(sorted)
215    }
216
217    async fn find_by_nonce(
218        &self,
219        relayer_id: &str,
220        nonce: u64,
221    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
222        let store = Self::acquire_lock(&self.store).await?;
223        let filtered: Vec<TransactionRepoModel> = store
224            .values()
225            .filter(|tx| {
226                tx.relayer_id == relayer_id
227                    && match &tx.network_data {
228                        NetworkTransactionData::Evm(data) => data.nonce == Some(nonce),
229                        _ => false,
230                    }
231            })
232            .cloned()
233            .collect();
234
235        Ok(filtered.into_iter().next())
236    }
237
238    async fn update_status(
239        &self,
240        tx_id: String,
241        status: TransactionStatus,
242    ) -> Result<TransactionRepoModel, RepositoryError> {
243        let update = TransactionUpdateRequest {
244            status: Some(status),
245            ..Default::default()
246        };
247        self.partial_update(tx_id, update).await
248    }
249
250    async fn partial_update(
251        &self,
252        tx_id: String,
253        update: TransactionUpdateRequest,
254    ) -> Result<TransactionRepoModel, RepositoryError> {
255        let mut store = Self::acquire_lock(&self.store).await?;
256
257        if let Some(tx) = store.get_mut(&tx_id) {
258            // Apply partial updates using the model's business logic
259            tx.apply_partial_update(update);
260            Ok(tx.clone())
261        } else {
262            Err(RepositoryError::NotFound(format!(
263                "Transaction with ID {} not found",
264                tx_id
265            )))
266        }
267    }
268
269    async fn update_network_data(
270        &self,
271        tx_id: String,
272        network_data: NetworkTransactionData,
273    ) -> Result<TransactionRepoModel, RepositoryError> {
274        let mut tx = self.get_by_id(tx_id.clone()).await?;
275        tx.network_data = network_data;
276        self.update(tx_id, tx).await
277    }
278
279    async fn set_sent_at(
280        &self,
281        tx_id: String,
282        sent_at: String,
283    ) -> Result<TransactionRepoModel, RepositoryError> {
284        let mut tx = self.get_by_id(tx_id.clone()).await?;
285        tx.sent_at = Some(sent_at);
286        self.update(tx_id, tx).await
287    }
288
289    async fn set_confirmed_at(
290        &self,
291        tx_id: String,
292        confirmed_at: String,
293    ) -> Result<TransactionRepoModel, RepositoryError> {
294        let mut tx = self.get_by_id(tx_id.clone()).await?;
295        tx.confirmed_at = Some(confirmed_at);
296        self.update(tx_id, tx).await
297    }
298}
299
300impl Default for InMemoryTransactionRepository {
301    fn default() -> Self {
302        Self::new()
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
309    use lazy_static::lazy_static;
310    use std::str::FromStr;
311
312    use crate::models::U256;
313
314    use super::*;
315
316    use tokio::sync::Mutex;
317
318    lazy_static! {
319        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
320    }
321    // Helper function to create test transactions
322    fn create_test_transaction(id: &str) -> TransactionRepoModel {
323        TransactionRepoModel {
324            id: id.to_string(),
325            relayer_id: "relayer-1".to_string(),
326            status: TransactionStatus::Pending,
327            status_reason: None,
328            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
329            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
330            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
331            valid_until: None,
332            delete_at: None,
333            network_type: NetworkType::Evm,
334            priced_at: None,
335            hashes: vec![],
336            network_data: NetworkTransactionData::Evm(EvmTransactionData {
337                gas_price: Some(1000000000),
338                gas_limit: Some(21000),
339                nonce: Some(1),
340                value: U256::from_str("1000000000000000000").unwrap(),
341                data: Some("0x".to_string()),
342                from: "0xSender".to_string(),
343                to: Some("0xRecipient".to_string()),
344                chain_id: 1,
345                signature: None,
346                hash: Some(format!("0x{}", id)),
347                speed: Some(Speed::Fast),
348                max_fee_per_gas: None,
349                max_priority_fee_per_gas: None,
350                raw: None,
351            }),
352            noop_count: None,
353            is_canceled: Some(false),
354        }
355    }
356
357    fn create_test_transaction_pending_state(id: &str) -> TransactionRepoModel {
358        TransactionRepoModel {
359            id: id.to_string(),
360            relayer_id: "relayer-1".to_string(),
361            status: TransactionStatus::Pending,
362            status_reason: None,
363            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
364            sent_at: None,
365            confirmed_at: None,
366            valid_until: None,
367            delete_at: None,
368            network_type: NetworkType::Evm,
369            priced_at: None,
370            hashes: vec![],
371            network_data: NetworkTransactionData::Evm(EvmTransactionData {
372                gas_price: Some(1000000000),
373                gas_limit: Some(21000),
374                nonce: Some(1),
375                value: U256::from_str("1000000000000000000").unwrap(),
376                data: Some("0x".to_string()),
377                from: "0xSender".to_string(),
378                to: Some("0xRecipient".to_string()),
379                chain_id: 1,
380                signature: None,
381                hash: Some(format!("0x{}", id)),
382                speed: Some(Speed::Fast),
383                max_fee_per_gas: None,
384                max_priority_fee_per_gas: None,
385                raw: None,
386            }),
387            noop_count: None,
388            is_canceled: Some(false),
389        }
390    }
391
392    #[tokio::test]
393    async fn test_create_transaction() {
394        let repo = InMemoryTransactionRepository::new();
395        let tx = create_test_transaction("test-1");
396
397        let result = repo.create(tx.clone()).await.unwrap();
398        assert_eq!(result.id, tx.id);
399        assert_eq!(repo.count().await.unwrap(), 1);
400    }
401
402    #[tokio::test]
403    async fn test_get_transaction() {
404        let repo = InMemoryTransactionRepository::new();
405        let tx = create_test_transaction("test-1");
406
407        repo.create(tx.clone()).await.unwrap();
408        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
409        if let NetworkTransactionData::Evm(stored_data) = &stored.network_data {
410            if let NetworkTransactionData::Evm(tx_data) = &tx.network_data {
411                assert_eq!(stored_data.hash, tx_data.hash);
412            }
413        }
414    }
415
416    #[tokio::test]
417    async fn test_update_transaction() {
418        let repo = InMemoryTransactionRepository::new();
419        let mut tx = create_test_transaction("test-1");
420
421        repo.create(tx.clone()).await.unwrap();
422        tx.status = TransactionStatus::Confirmed;
423
424        let updated = repo.update("test-1".to_string(), tx).await.unwrap();
425        assert!(matches!(updated.status, TransactionStatus::Confirmed));
426    }
427
428    #[tokio::test]
429    async fn test_delete_transaction() {
430        let repo = InMemoryTransactionRepository::new();
431        let tx = create_test_transaction("test-1");
432
433        repo.create(tx).await.unwrap();
434        repo.delete_by_id("test-1".to_string()).await.unwrap();
435
436        let result = repo.get_by_id("test-1".to_string()).await;
437        assert!(result.is_err());
438    }
439
440    #[tokio::test]
441    async fn test_list_all_transactions() {
442        let repo = InMemoryTransactionRepository::new();
443        let tx1 = create_test_transaction("test-1");
444        let tx2 = create_test_transaction("test-2");
445
446        repo.create(tx1).await.unwrap();
447        repo.create(tx2).await.unwrap();
448
449        let transactions = repo.list_all().await.unwrap();
450        assert_eq!(transactions.len(), 2);
451    }
452
453    #[tokio::test]
454    async fn test_count_transactions() {
455        let repo = InMemoryTransactionRepository::new();
456        let tx = create_test_transaction("test-1");
457
458        assert_eq!(repo.count().await.unwrap(), 0);
459        repo.create(tx).await.unwrap();
460        assert_eq!(repo.count().await.unwrap(), 1);
461    }
462
463    #[tokio::test]
464    async fn test_get_nonexistent_transaction() {
465        let repo = InMemoryTransactionRepository::new();
466        let result = repo.get_by_id("nonexistent".to_string()).await;
467        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
468    }
469
470    #[tokio::test]
471    async fn test_duplicate_transaction_creation() {
472        let repo = InMemoryTransactionRepository::new();
473        let tx = create_test_transaction("test-1");
474
475        repo.create(tx.clone()).await.unwrap();
476        let result = repo.create(tx).await;
477
478        assert!(matches!(
479            result,
480            Err(RepositoryError::ConstraintViolation(_))
481        ));
482    }
483
484    #[tokio::test]
485    async fn test_update_nonexistent_transaction() {
486        let repo = InMemoryTransactionRepository::new();
487        let tx = create_test_transaction("test-1");
488
489        let result = repo.update("nonexistent".to_string(), tx).await;
490        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
491    }
492
493    #[tokio::test]
494    async fn test_partial_update() {
495        let repo = InMemoryTransactionRepository::new();
496        let tx = create_test_transaction_pending_state("test-tx-id");
497        repo.create(tx.clone()).await.unwrap();
498
499        // Test updating only status
500        let update1 = TransactionUpdateRequest {
501            status: Some(TransactionStatus::Sent),
502            status_reason: None,
503            sent_at: None,
504            confirmed_at: None,
505            network_data: None,
506            hashes: None,
507            priced_at: None,
508            noop_count: None,
509            is_canceled: None,
510            delete_at: None,
511        };
512        let updated_tx1 = repo
513            .partial_update("test-tx-id".to_string(), update1)
514            .await
515            .unwrap();
516        assert_eq!(updated_tx1.status, TransactionStatus::Sent);
517        assert_eq!(updated_tx1.sent_at, None);
518
519        // Test updating multiple fields
520        let update2 = TransactionUpdateRequest {
521            status: Some(TransactionStatus::Confirmed),
522            status_reason: None,
523            sent_at: Some("2023-01-01T12:00:00Z".to_string()),
524            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
525            network_data: None,
526            hashes: None,
527            priced_at: None,
528            noop_count: None,
529            is_canceled: None,
530            delete_at: None,
531        };
532        let updated_tx2 = repo
533            .partial_update("test-tx-id".to_string(), update2)
534            .await
535            .unwrap();
536        assert_eq!(updated_tx2.status, TransactionStatus::Confirmed);
537        assert_eq!(
538            updated_tx2.sent_at,
539            Some("2023-01-01T12:00:00Z".to_string())
540        );
541        assert_eq!(
542            updated_tx2.confirmed_at,
543            Some("2023-01-01T12:05:00Z".to_string())
544        );
545
546        // Test updating non-existent transaction
547        let update3 = TransactionUpdateRequest {
548            status: Some(TransactionStatus::Failed),
549            status_reason: None,
550            sent_at: None,
551            confirmed_at: None,
552            network_data: None,
553            hashes: None,
554            priced_at: None,
555            noop_count: None,
556            is_canceled: None,
557            delete_at: None,
558        };
559        let result = repo
560            .partial_update("non-existent-id".to_string(), update3)
561            .await;
562        assert!(result.is_err());
563        assert!(matches!(result.unwrap_err(), RepositoryError::NotFound(_)));
564    }
565
566    #[tokio::test]
567    async fn test_update_status() {
568        let repo = InMemoryTransactionRepository::new();
569        let tx = create_test_transaction("test-1");
570
571        repo.create(tx).await.unwrap();
572
573        // Update status to Confirmed
574        let updated = repo
575            .update_status("test-1".to_string(), TransactionStatus::Confirmed)
576            .await
577            .unwrap();
578
579        // Verify the status was updated in the returned transaction
580        assert_eq!(updated.status, TransactionStatus::Confirmed);
581
582        // Also verify by getting the transaction directly
583        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
584        assert_eq!(stored.status, TransactionStatus::Confirmed);
585
586        // Update status to Failed
587        let updated = repo
588            .update_status("test-1".to_string(), TransactionStatus::Failed)
589            .await
590            .unwrap();
591
592        // Verify the status was updated
593        assert_eq!(updated.status, TransactionStatus::Failed);
594
595        // Verify updating a non-existent transaction
596        let result = repo
597            .update_status("non-existent".to_string(), TransactionStatus::Confirmed)
598            .await;
599        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
600    }
601
602    #[tokio::test]
603    async fn test_list_paginated() {
604        let repo = InMemoryTransactionRepository::new();
605
606        // Create multiple transactions
607        for i in 1..=10 {
608            let tx = create_test_transaction(&format!("test-{}", i));
609            repo.create(tx).await.unwrap();
610        }
611
612        // Test first page with 3 items per page
613        let query = PaginationQuery {
614            page: 1,
615            per_page: 3,
616        };
617        let result = repo.list_paginated(query).await.unwrap();
618        assert_eq!(result.items.len(), 3);
619        assert_eq!(result.total, 10);
620        assert_eq!(result.page, 1);
621        assert_eq!(result.per_page, 3);
622
623        // Test second page with 3 items per page
624        let query = PaginationQuery {
625            page: 2,
626            per_page: 3,
627        };
628        let result = repo.list_paginated(query).await.unwrap();
629        assert_eq!(result.items.len(), 3);
630        assert_eq!(result.total, 10);
631        assert_eq!(result.page, 2);
632        assert_eq!(result.per_page, 3);
633
634        // Test page with fewer items than per_page
635        let query = PaginationQuery {
636            page: 4,
637            per_page: 3,
638        };
639        let result = repo.list_paginated(query).await.unwrap();
640        assert_eq!(result.items.len(), 1);
641        assert_eq!(result.total, 10);
642        assert_eq!(result.page, 4);
643        assert_eq!(result.per_page, 3);
644
645        // Test empty page (beyond total items)
646        let query = PaginationQuery {
647            page: 5,
648            per_page: 3,
649        };
650        let result = repo.list_paginated(query).await.unwrap();
651        assert_eq!(result.items.len(), 0);
652        assert_eq!(result.total, 10);
653    }
654
655    #[tokio::test]
656    async fn test_find_by_nonce() {
657        let repo = InMemoryTransactionRepository::new();
658
659        // Create transactions with different nonces
660        let tx1 = create_test_transaction("test-1");
661
662        let mut tx2 = create_test_transaction("test-2");
663        if let NetworkTransactionData::Evm(ref mut data) = tx2.network_data {
664            data.nonce = Some(2);
665        }
666
667        let mut tx3 = create_test_transaction("test-3");
668        tx3.relayer_id = "relayer-2".to_string();
669        if let NetworkTransactionData::Evm(ref mut data) = tx3.network_data {
670            data.nonce = Some(1);
671        }
672
673        repo.create(tx1).await.unwrap();
674        repo.create(tx2).await.unwrap();
675        repo.create(tx3).await.unwrap();
676
677        // Test finding transaction with specific relayer_id and nonce
678        let result = repo.find_by_nonce("relayer-1", 1).await.unwrap();
679        assert!(result.is_some());
680        assert_eq!(result.as_ref().unwrap().id, "test-1");
681
682        // Test finding transaction with a different nonce
683        let result = repo.find_by_nonce("relayer-1", 2).await.unwrap();
684        assert!(result.is_some());
685        assert_eq!(result.as_ref().unwrap().id, "test-2");
686
687        // Test finding transaction from a different relayer
688        let result = repo.find_by_nonce("relayer-2", 1).await.unwrap();
689        assert!(result.is_some());
690        assert_eq!(result.as_ref().unwrap().id, "test-3");
691
692        // Test finding transaction that doesn't exist
693        let result = repo.find_by_nonce("relayer-1", 99).await.unwrap();
694        assert!(result.is_none());
695    }
696
697    #[tokio::test]
698    async fn test_update_network_data() {
699        let repo = InMemoryTransactionRepository::new();
700        let tx = create_test_transaction("test-1");
701
702        repo.create(tx.clone()).await.unwrap();
703
704        // Create new network data with updated values
705        let updated_network_data = NetworkTransactionData::Evm(EvmTransactionData {
706            gas_price: Some(2000000000),
707            gas_limit: Some(30000),
708            nonce: Some(2),
709            value: U256::from_str("2000000000000000000").unwrap(),
710            data: Some("0xUpdated".to_string()),
711            from: "0xSender".to_string(),
712            to: Some("0xRecipient".to_string()),
713            chain_id: 1,
714            signature: None,
715            hash: Some("0xUpdated".to_string()),
716            raw: None,
717            speed: None,
718            max_fee_per_gas: None,
719            max_priority_fee_per_gas: None,
720        });
721
722        let updated = repo
723            .update_network_data("test-1".to_string(), updated_network_data)
724            .await
725            .unwrap();
726
727        // Verify the network data was updated
728        if let NetworkTransactionData::Evm(data) = &updated.network_data {
729            assert_eq!(data.gas_price, Some(2000000000));
730            assert_eq!(data.gas_limit, Some(30000));
731            assert_eq!(data.nonce, Some(2));
732            assert_eq!(data.hash, Some("0xUpdated".to_string()));
733            assert_eq!(data.data, Some("0xUpdated".to_string()));
734        } else {
735            panic!("Expected EVM network data");
736        }
737    }
738
739    #[tokio::test]
740    async fn test_set_sent_at() {
741        let repo = InMemoryTransactionRepository::new();
742        let tx = create_test_transaction("test-1");
743
744        repo.create(tx).await.unwrap();
745
746        // Updated sent_at timestamp
747        let new_sent_at = "2025-02-01T10:00:00.000000+00:00".to_string();
748
749        let updated = repo
750            .set_sent_at("test-1".to_string(), new_sent_at.clone())
751            .await
752            .unwrap();
753
754        // Verify the sent_at timestamp was updated
755        assert_eq!(updated.sent_at, Some(new_sent_at.clone()));
756
757        // Also verify by getting the transaction directly
758        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
759        assert_eq!(stored.sent_at, Some(new_sent_at.clone()));
760    }
761
762    #[tokio::test]
763    async fn test_set_confirmed_at() {
764        let repo = InMemoryTransactionRepository::new();
765        let tx = create_test_transaction("test-1");
766
767        repo.create(tx).await.unwrap();
768
769        // Updated confirmed_at timestamp
770        let new_confirmed_at = "2025-02-01T11:30:45.123456+00:00".to_string();
771
772        let updated = repo
773            .set_confirmed_at("test-1".to_string(), new_confirmed_at.clone())
774            .await
775            .unwrap();
776
777        // Verify the confirmed_at timestamp was updated
778        assert_eq!(updated.confirmed_at, Some(new_confirmed_at.clone()));
779
780        // Also verify by getting the transaction directly
781        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
782        assert_eq!(stored.confirmed_at, Some(new_confirmed_at.clone()));
783    }
784
785    #[tokio::test]
786    async fn test_find_by_relayer_id() {
787        let repo = InMemoryTransactionRepository::new();
788        let tx1 = create_test_transaction("test-1");
789        let tx2 = create_test_transaction("test-2");
790
791        // Create a transaction with a different relayer_id
792        let mut tx3 = create_test_transaction("test-3");
793        tx3.relayer_id = "relayer-2".to_string();
794
795        repo.create(tx1).await.unwrap();
796        repo.create(tx2).await.unwrap();
797        repo.create(tx3).await.unwrap();
798
799        // Test finding transactions for relayer-1
800        let query = PaginationQuery {
801            page: 1,
802            per_page: 10,
803        };
804        let result = repo
805            .find_by_relayer_id("relayer-1", query.clone())
806            .await
807            .unwrap();
808        assert_eq!(result.total, 2);
809        assert_eq!(result.items.len(), 2);
810        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
811
812        // Test finding transactions for relayer-2
813        let result = repo
814            .find_by_relayer_id("relayer-2", query.clone())
815            .await
816            .unwrap();
817        assert_eq!(result.total, 1);
818        assert_eq!(result.items.len(), 1);
819        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
820
821        // Test finding transactions for non-existent relayer
822        let result = repo
823            .find_by_relayer_id("non-existent", query.clone())
824            .await
825            .unwrap();
826        assert_eq!(result.total, 0);
827        assert_eq!(result.items.len(), 0);
828    }
829
830    #[tokio::test]
831    async fn test_find_by_status() {
832        let repo = InMemoryTransactionRepository::new();
833        let tx1 = create_test_transaction_pending_state("tx1");
834        let mut tx2 = create_test_transaction_pending_state("tx2");
835        tx2.status = TransactionStatus::Submitted;
836        let mut tx3 = create_test_transaction_pending_state("tx3");
837        tx3.relayer_id = "relayer-2".to_string();
838        tx3.status = TransactionStatus::Pending;
839
840        repo.create(tx1.clone()).await.unwrap();
841        repo.create(tx2.clone()).await.unwrap();
842        repo.create(tx3.clone()).await.unwrap();
843
844        // Test finding by single status
845        let pending_txs = repo
846            .find_by_status("relayer-1", &[TransactionStatus::Pending])
847            .await
848            .unwrap();
849        assert_eq!(pending_txs.len(), 1);
850        assert_eq!(pending_txs[0].id, "tx1");
851
852        let submitted_txs = repo
853            .find_by_status("relayer-1", &[TransactionStatus::Submitted])
854            .await
855            .unwrap();
856        assert_eq!(submitted_txs.len(), 1);
857        assert_eq!(submitted_txs[0].id, "tx2");
858
859        // Test finding by multiple statuses
860        let multiple_status_txs = repo
861            .find_by_status(
862                "relayer-1",
863                &[TransactionStatus::Pending, TransactionStatus::Submitted],
864            )
865            .await
866            .unwrap();
867        assert_eq!(multiple_status_txs.len(), 2);
868
869        // Test finding for different relayer
870        let relayer2_pending = repo
871            .find_by_status("relayer-2", &[TransactionStatus::Pending])
872            .await
873            .unwrap();
874        assert_eq!(relayer2_pending.len(), 1);
875        assert_eq!(relayer2_pending[0].id, "tx3");
876
877        // Test finding for non-existent relayer
878        let no_txs = repo
879            .find_by_status("non-existent", &[TransactionStatus::Pending])
880            .await
881            .unwrap();
882        assert_eq!(no_txs.len(), 0);
883    }
884
885    #[tokio::test]
886    async fn test_find_by_status_sorted_by_created_at() {
887        let repo = InMemoryTransactionRepository::new();
888
889        // Helper function to create transaction with custom created_at timestamp
890        let create_tx_with_timestamp = |id: &str, timestamp: &str| -> TransactionRepoModel {
891            let mut tx = create_test_transaction_pending_state(id);
892            tx.created_at = timestamp.to_string();
893            tx.status = TransactionStatus::Pending;
894            tx
895        };
896
897        // Create transactions with different timestamps (out of chronological order)
898        let tx3 = create_tx_with_timestamp("tx3", "2025-01-27T17:00:00.000000+00:00"); // Latest
899        let tx1 = create_tx_with_timestamp("tx1", "2025-01-27T15:00:00.000000+00:00"); // Earliest
900        let tx2 = create_tx_with_timestamp("tx2", "2025-01-27T16:00:00.000000+00:00"); // Middle
901
902        // Create them in reverse chronological order to test sorting
903        repo.create(tx3.clone()).await.unwrap();
904        repo.create(tx1.clone()).await.unwrap();
905        repo.create(tx2.clone()).await.unwrap();
906
907        // Find by status
908        let result = repo
909            .find_by_status("relayer-1", &[TransactionStatus::Pending])
910            .await
911            .unwrap();
912
913        // Verify they are sorted by created_at (oldest first)
914        assert_eq!(result.len(), 3);
915        assert_eq!(result[0].id, "tx1"); // Earliest
916        assert_eq!(result[1].id, "tx2"); // Middle
917        assert_eq!(result[2].id, "tx3"); // Latest
918
919        // Verify the timestamps are in ascending order
920        assert_eq!(result[0].created_at, "2025-01-27T15:00:00.000000+00:00");
921        assert_eq!(result[1].created_at, "2025-01-27T16:00:00.000000+00:00");
922        assert_eq!(result[2].created_at, "2025-01-27T17:00:00.000000+00:00");
923    }
924
925    #[tokio::test]
926    async fn test_has_entries() {
927        let repo = InMemoryTransactionRepository::new();
928        assert!(!repo.has_entries().await.unwrap());
929
930        let tx = create_test_transaction("test");
931        repo.create(tx.clone()).await.unwrap();
932
933        assert!(repo.has_entries().await.unwrap());
934    }
935
936    #[tokio::test]
937    async fn test_drop_all_entries() {
938        let repo = InMemoryTransactionRepository::new();
939        let tx = create_test_transaction("test");
940        repo.create(tx.clone()).await.unwrap();
941
942        assert!(repo.has_entries().await.unwrap());
943
944        repo.drop_all_entries().await.unwrap();
945        assert!(!repo.has_entries().await.unwrap());
946    }
947
948    // Tests for delete_at field setting on final status updates
949
950    #[tokio::test]
951    async fn test_update_status_sets_delete_at_for_final_statuses() {
952        let _lock = ENV_MUTEX.lock().await;
953
954        use chrono::{DateTime, Duration, Utc};
955        use std::env;
956
957        // Use a unique test environment variable to avoid conflicts
958        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
959
960        let repo = InMemoryTransactionRepository::new();
961
962        let final_statuses = [
963            TransactionStatus::Canceled,
964            TransactionStatus::Confirmed,
965            TransactionStatus::Failed,
966            TransactionStatus::Expired,
967        ];
968
969        for (i, status) in final_statuses.iter().enumerate() {
970            let tx_id = format!("test-final-{}", i);
971            let tx = create_test_transaction_pending_state(&tx_id);
972
973            // Ensure transaction has no delete_at initially
974            assert!(tx.delete_at.is_none());
975
976            repo.create(tx).await.unwrap();
977
978            let before_update = Utc::now();
979
980            // Update to final status
981            let updated = repo
982                .update_status(tx_id.clone(), status.clone())
983                .await
984                .unwrap();
985
986            // Should have delete_at set
987            assert!(
988                updated.delete_at.is_some(),
989                "delete_at should be set for status: {:?}",
990                status
991            );
992
993            // Verify the timestamp is reasonable (approximately 6 hours from now)
994            let delete_at_str = updated.delete_at.unwrap();
995            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
996                .expect("delete_at should be valid RFC3339")
997                .with_timezone(&Utc);
998
999            let duration_from_before = delete_at.signed_duration_since(before_update);
1000            let expected_duration = Duration::hours(6);
1001            let tolerance = Duration::minutes(5);
1002
1003            assert!(
1004                duration_from_before >= expected_duration - tolerance &&
1005                duration_from_before <= expected_duration + tolerance,
1006                "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1007                status, duration_from_before
1008            );
1009        }
1010
1011        // Cleanup
1012        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1013    }
1014
1015    #[tokio::test]
1016    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1017        let _lock = ENV_MUTEX.lock().await;
1018
1019        use std::env;
1020
1021        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1022
1023        let repo = InMemoryTransactionRepository::new();
1024
1025        let non_final_statuses = [
1026            TransactionStatus::Pending,
1027            TransactionStatus::Sent,
1028            TransactionStatus::Submitted,
1029            TransactionStatus::Mined,
1030        ];
1031
1032        for (i, status) in non_final_statuses.iter().enumerate() {
1033            let tx_id = format!("test-non-final-{}", i);
1034            let tx = create_test_transaction_pending_state(&tx_id);
1035
1036            repo.create(tx).await.unwrap();
1037
1038            // Update to non-final status
1039            let updated = repo
1040                .update_status(tx_id.clone(), status.clone())
1041                .await
1042                .unwrap();
1043
1044            // Should NOT have delete_at set
1045            assert!(
1046                updated.delete_at.is_none(),
1047                "delete_at should NOT be set for status: {:?}",
1048                status
1049            );
1050        }
1051
1052        // Cleanup
1053        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1054    }
1055
1056    #[tokio::test]
1057    async fn test_partial_update_sets_delete_at_for_final_statuses() {
1058        let _lock = ENV_MUTEX.lock().await;
1059
1060        use chrono::{DateTime, Duration, Utc};
1061        use std::env;
1062
1063        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1064
1065        let repo = InMemoryTransactionRepository::new();
1066        let tx = create_test_transaction_pending_state("test-partial-final");
1067
1068        repo.create(tx).await.unwrap();
1069
1070        let before_update = Utc::now();
1071
1072        // Use partial_update to set status to Confirmed (final status)
1073        let update = TransactionUpdateRequest {
1074            status: Some(TransactionStatus::Confirmed),
1075            status_reason: Some("Transaction completed".to_string()),
1076            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1077            ..Default::default()
1078        };
1079
1080        let updated = repo
1081            .partial_update("test-partial-final".to_string(), update)
1082            .await
1083            .unwrap();
1084
1085        // Should have delete_at set
1086        assert!(
1087            updated.delete_at.is_some(),
1088            "delete_at should be set when updating to Confirmed status"
1089        );
1090
1091        // Verify the timestamp is reasonable (approximately 8 hours from now)
1092        let delete_at_str = updated.delete_at.unwrap();
1093        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1094            .expect("delete_at should be valid RFC3339")
1095            .with_timezone(&Utc);
1096
1097        let duration_from_before = delete_at.signed_duration_since(before_update);
1098        let expected_duration = Duration::hours(8);
1099        let tolerance = Duration::minutes(5);
1100
1101        assert!(
1102            duration_from_before >= expected_duration - tolerance
1103                && duration_from_before <= expected_duration + tolerance,
1104            "delete_at should be approximately 8 hours from now. Duration: {:?}",
1105            duration_from_before
1106        );
1107
1108        // Also verify other fields were updated
1109        assert_eq!(updated.status, TransactionStatus::Confirmed);
1110        assert_eq!(
1111            updated.status_reason,
1112            Some("Transaction completed".to_string())
1113        );
1114        assert_eq!(
1115            updated.confirmed_at,
1116            Some("2023-01-01T12:05:00Z".to_string())
1117        );
1118
1119        // Cleanup
1120        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1121    }
1122
1123    #[tokio::test]
1124    async fn test_update_status_preserves_existing_delete_at() {
1125        let _lock = ENV_MUTEX.lock().await;
1126
1127        use std::env;
1128
1129        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1130
1131        let repo = InMemoryTransactionRepository::new();
1132        let mut tx = create_test_transaction_pending_state("test-preserve-delete-at");
1133
1134        // Set an existing delete_at value
1135        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1136        tx.delete_at = Some(existing_delete_at.clone());
1137
1138        repo.create(tx).await.unwrap();
1139
1140        // Update to final status
1141        let updated = repo
1142            .update_status(
1143                "test-preserve-delete-at".to_string(),
1144                TransactionStatus::Confirmed,
1145            )
1146            .await
1147            .unwrap();
1148
1149        // Should preserve the existing delete_at value
1150        assert_eq!(
1151            updated.delete_at,
1152            Some(existing_delete_at),
1153            "Existing delete_at should be preserved when updating to final status"
1154        );
1155
1156        // Cleanup
1157        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1158    }
1159
1160    #[tokio::test]
1161    async fn test_partial_update_without_status_change_preserves_delete_at() {
1162        let _lock = ENV_MUTEX.lock().await;
1163
1164        use std::env;
1165
1166        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1167
1168        let repo = InMemoryTransactionRepository::new();
1169        let tx = create_test_transaction_pending_state("test-preserve-no-status");
1170
1171        repo.create(tx).await.unwrap();
1172
1173        // First, update to final status to set delete_at
1174        let updated1 = repo
1175            .update_status(
1176                "test-preserve-no-status".to_string(),
1177                TransactionStatus::Confirmed,
1178            )
1179            .await
1180            .unwrap();
1181
1182        assert!(updated1.delete_at.is_some());
1183        let original_delete_at = updated1.delete_at.clone();
1184
1185        // Now update other fields without changing status
1186        let update = TransactionUpdateRequest {
1187            status: None, // No status change
1188            status_reason: Some("Updated reason".to_string()),
1189            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1190            ..Default::default()
1191        };
1192
1193        let updated2 = repo
1194            .partial_update("test-preserve-no-status".to_string(), update)
1195            .await
1196            .unwrap();
1197
1198        // delete_at should be preserved
1199        assert_eq!(
1200            updated2.delete_at, original_delete_at,
1201            "delete_at should be preserved when status is not updated"
1202        );
1203
1204        // Other fields should be updated
1205        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
1206        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1207        assert_eq!(
1208            updated2.confirmed_at,
1209            Some("2023-01-01T12:10:00Z".to_string())
1210        );
1211
1212        // Cleanup
1213        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1214    }
1215
1216    #[tokio::test]
1217    async fn test_update_status_multiple_updates_idempotent() {
1218        let _lock = ENV_MUTEX.lock().await;
1219
1220        use std::env;
1221
1222        env::set_var("TRANSACTION_EXPIRATION_HOURS", "12");
1223
1224        let repo = InMemoryTransactionRepository::new();
1225        let tx = create_test_transaction_pending_state("test-idempotent");
1226
1227        repo.create(tx).await.unwrap();
1228
1229        // First update to final status
1230        let updated1 = repo
1231            .update_status("test-idempotent".to_string(), TransactionStatus::Confirmed)
1232            .await
1233            .unwrap();
1234
1235        assert!(updated1.delete_at.is_some());
1236        let first_delete_at = updated1.delete_at.clone();
1237
1238        // Second update to another final status
1239        let updated2 = repo
1240            .update_status("test-idempotent".to_string(), TransactionStatus::Failed)
1241            .await
1242            .unwrap();
1243
1244        // delete_at should remain the same (idempotent)
1245        assert_eq!(
1246            updated2.delete_at, first_delete_at,
1247            "delete_at should not change on subsequent final status updates"
1248        );
1249
1250        // Status should be updated
1251        assert_eq!(updated2.status, TransactionStatus::Failed);
1252
1253        // Cleanup
1254        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1255    }
1256}