openzeppelin_relayer/jobs/handlers/
transaction_cleanup_handler.rs

1//! Transaction cleanup worker implementation.
2//!
3//! This module implements the transaction cleanup worker that processes
4//! expired transactions marked for deletion. It runs as a cron job to
5//! automatically clean up transactions that have passed their delete_at timestamp.
6
7use actix_web::web::ThinData;
8use apalis::prelude::{Attempt, Data, *};
9use chrono::{DateTime, Utc};
10use eyre::Result;
11use log::{debug, error, info, warn};
12use std::sync::Arc;
13
14use crate::{
15    constants::{FINAL_TRANSACTION_STATUSES, WORKER_DEFAULT_MAXIMUM_RETRIES},
16    jobs::handle_result,
17    models::{DefaultAppState, RelayerRepoModel, TransactionRepoModel},
18    repositories::{Repository, TransactionRepository},
19};
20
21/// Maximum number of relayers to process concurrently
22const MAX_CONCURRENT_RELAYERS: usize = 10;
23
24/// Maximum number of transactions to process concurrently per relayer
25const MAX_CONCURRENT_TRANSACTIONS_PER_RELAYER: usize = 50;
26
27/// Handles periodic transaction cleanup jobs from the queue.
28///
29/// This function processes expired transactions by:
30/// 1. Fetching all relayers from the system
31/// 2. For each relayer, finding transactions with final statuses
32/// 3. Checking if their delete_at timestamp has passed
33/// 4. Validating transactions are in final states before deletion
34/// 5. Deleting transactions that have expired (in parallel)
35///
36/// # Arguments
37/// * `job` - The cron reminder job triggering the cleanup
38/// * `data` - Application state containing repositories
39/// * `attempt` - Current attempt number for retry logic
40///
41/// # Returns
42/// * `Result<(), Error>` - Success or failure of cleanup processing
43pub async fn transaction_cleanup_handler(
44    job: TransactionCleanupCronReminder,
45    data: Data<ThinData<DefaultAppState>>,
46    attempt: Attempt,
47) -> Result<(), Error> {
48    let result = handle_request(job, data, attempt.clone()).await;
49
50    handle_result(
51        result,
52        attempt,
53        "TransactionCleanup",
54        WORKER_DEFAULT_MAXIMUM_RETRIES,
55    )
56}
57
58/// Represents a cron reminder job for triggering cleanup operations.
59#[derive(Default, Debug, Clone)]
60pub struct TransactionCleanupCronReminder();
61
62/// Handles the actual transaction cleanup request logic.
63///
64/// # Arguments
65/// * `_job` - The cron reminder job (currently unused)
66/// * `data` - Application state containing repositories
67/// * `_attempt` - Current attempt number (currently unused)
68///
69/// # Returns
70/// * `Result<()>` - Success or failure of the cleanup operation
71async fn handle_request(
72    _job: TransactionCleanupCronReminder,
73    data: Data<ThinData<DefaultAppState>>,
74    _attempt: Attempt,
75) -> Result<()> {
76    let now = Utc::now();
77    info!(
78        "Executing transaction cleanup from storage at: {}",
79        now.to_rfc3339()
80    );
81
82    let transaction_repo = data.transaction_repository();
83    let relayer_repo = data.relayer_repository();
84
85    // Fetch all relayers
86    let relayers = relayer_repo.list_all().await.map_err(|e| {
87        error!("Failed to fetch relayers for cleanup: {}", e);
88        eyre::eyre!("Failed to fetch relayers: {}", e)
89    })?;
90
91    info!("Found {} relayers to process for cleanup", relayers.len());
92
93    // Process relayers in parallel batches
94    let cleanup_results = process_relayers_in_batches(relayers, transaction_repo, now).await;
95
96    // Aggregate and report results
97    report_cleanup_results(cleanup_results).await
98}
99
100/// Processes multiple relayers in parallel batches for cleanup.
101///
102/// # Arguments
103/// * `relayers` - List of relayers to process
104/// * `transaction_repo` - Reference to the transaction repository
105/// * `now` - Current UTC timestamp for comparison
106///
107/// # Returns
108/// * `Vec<RelayerCleanupResult>` - Results from processing each relayer
109async fn process_relayers_in_batches(
110    relayers: Vec<RelayerRepoModel>,
111    transaction_repo: Arc<impl TransactionRepository>,
112    now: DateTime<Utc>,
113) -> Vec<RelayerCleanupResult> {
114    use futures::stream::{self, StreamExt};
115
116    // Process relayers with limited concurrency to avoid overwhelming the system
117    let results: Vec<RelayerCleanupResult> = stream::iter(relayers)
118        .map(|relayer| {
119            let repo_clone = Arc::clone(&transaction_repo);
120            async move { process_single_relayer(relayer, repo_clone, now).await }
121        })
122        .buffer_unordered(MAX_CONCURRENT_RELAYERS)
123        .collect()
124        .await;
125
126    results
127}
128
129/// Result of processing a single relayer's transactions.
130#[derive(Debug)]
131struct RelayerCleanupResult {
132    relayer_id: String,
133    cleaned_count: usize,
134    error: Option<String>,
135}
136
137/// Processes cleanup for a single relayer.
138///
139/// # Arguments
140/// * `relayer` - The relayer to process
141/// * `transaction_repo` - Reference to the transaction repository
142/// * `now` - Current UTC timestamp for comparison
143///
144/// # Returns
145/// * `RelayerCleanupResult` - Result of processing this relayer
146async fn process_single_relayer(
147    relayer: RelayerRepoModel,
148    transaction_repo: Arc<impl TransactionRepository>,
149    now: DateTime<Utc>,
150) -> RelayerCleanupResult {
151    debug!("Processing cleanup for relayer: {}", relayer.id);
152
153    match fetch_final_transactions(&relayer.id, &transaction_repo).await {
154        Ok(final_transactions) => {
155            debug!(
156                "Found {} transactions with final statuses for relayer: {}",
157                final_transactions.len(),
158                relayer.id
159            );
160
161            let cleaned_count = process_transactions_for_cleanup(
162                final_transactions,
163                &transaction_repo,
164                &relayer.id,
165                now,
166            )
167            .await;
168
169            if cleaned_count > 0 {
170                info!(
171                    "Cleaned up {} expired transactions for relayer: {}",
172                    cleaned_count, relayer.id
173                );
174            }
175
176            RelayerCleanupResult {
177                relayer_id: relayer.id,
178                cleaned_count,
179                error: None,
180            }
181        }
182        Err(e) => {
183            error!(
184                "Failed to fetch final transactions for relayer {}: {}",
185                relayer.id, e
186            );
187            RelayerCleanupResult {
188                relayer_id: relayer.id,
189                cleaned_count: 0,
190                error: Some(e.to_string()),
191            }
192        }
193    }
194}
195
196/// Fetches all transactions with final statuses for a specific relayer.
197///
198/// # Arguments
199/// * `relayer_id` - ID of the relayer
200/// * `transaction_repo` - Reference to the transaction repository
201///
202/// # Returns
203/// * `Result<Vec<TransactionRepoModel>>` - List of transactions with final statuses or error
204async fn fetch_final_transactions(
205    relayer_id: &str,
206    transaction_repo: &Arc<impl TransactionRepository>,
207) -> Result<Vec<TransactionRepoModel>> {
208    transaction_repo
209        .find_by_status(relayer_id, FINAL_TRANSACTION_STATUSES)
210        .await
211        .map_err(|e| {
212            eyre::eyre!(
213                "Failed to fetch final transactions for relayer {}: {}",
214                relayer_id,
215                e
216            )
217        })
218}
219
220/// Processes a list of transactions for cleanup in parallel, deleting expired ones.
221///
222/// This function validates that transactions are in final states before deletion,
223/// ensuring data integrity by preventing accidental deletion of active transactions.
224///
225/// # Arguments
226/// * `transactions` - List of transactions to process
227/// * `transaction_repo` - Reference to the transaction repository
228/// * `relayer_id` - ID of the relayer (for logging)
229/// * `now` - Current UTC timestamp for comparison
230///
231/// # Returns
232/// * `usize` - Number of transactions successfully cleaned up
233async fn process_transactions_for_cleanup(
234    transactions: Vec<TransactionRepoModel>,
235    transaction_repo: &Arc<impl Repository<TransactionRepoModel, String>>,
236    relayer_id: &str,
237    now: DateTime<Utc>,
238) -> usize {
239    use futures::stream::{self, StreamExt};
240
241    if transactions.is_empty() {
242        return 0;
243    }
244
245    debug!(
246        "Processing {} transactions in parallel for relayer: {}",
247        transactions.len(),
248        relayer_id
249    );
250
251    // Filter expired transactions first (this is fast and synchronous)
252    let expired_transactions: Vec<TransactionRepoModel> = transactions
253        .into_iter()
254        .filter(|tx| should_delete_transaction(tx, now))
255        .collect();
256
257    if expired_transactions.is_empty() {
258        debug!("No expired transactions found for relayer: {}", relayer_id);
259        return 0;
260    }
261
262    debug!(
263        "Found {} expired transactions to delete for relayer: {}",
264        expired_transactions.len(),
265        relayer_id
266    );
267
268    // Process deletions in parallel with limited concurrency
269    let deletion_results: Vec<bool> = stream::iter(expired_transactions)
270        .map(|transaction| {
271            let repo_clone = Arc::clone(transaction_repo);
272            let relayer_id = relayer_id.to_string();
273            async move {
274                match delete_expired_transaction(&transaction, &repo_clone, &relayer_id).await {
275                    Ok(()) => true,
276                    Err(e) => {
277                        error!(
278                            "Failed to delete expired transaction {}: {}",
279                            transaction.id, e
280                        );
281                        false
282                    }
283                }
284            }
285        })
286        .buffer_unordered(MAX_CONCURRENT_TRANSACTIONS_PER_RELAYER)
287        .collect()
288        .await;
289
290    // Count successful deletions
291    let cleaned_count = deletion_results.iter().filter(|&&success| success).count();
292
293    debug!(
294        "Successfully deleted {}/{} expired transactions for relayer: {}",
295        cleaned_count,
296        deletion_results.len(),
297        relayer_id
298    );
299
300    cleaned_count
301}
302
303/// Determines if a transaction should be deleted based on its delete_at timestamp.
304///
305/// # Arguments
306/// * `transaction` - The transaction to check
307/// * `now` - Current UTC timestamp for comparison
308///
309/// # Returns
310/// * `bool` - True if the transaction should be deleted, false otherwise
311fn should_delete_transaction(transaction: &TransactionRepoModel, now: DateTime<Utc>) -> bool {
312    transaction
313        .delete_at
314        .as_ref()
315        .and_then(|delete_at_str| DateTime::parse_from_rfc3339(delete_at_str).ok())
316        .map(|delete_at| {
317            let is_expired = now >= delete_at.with_timezone(&Utc);
318            if is_expired {
319                debug!(
320                    "Transaction {} is expired (expired at: {})",
321                    transaction.id,
322                    delete_at.to_rfc3339()
323                );
324            }
325            is_expired
326        })
327        .unwrap_or_else(|| {
328            if transaction.delete_at.is_some() {
329                warn!(
330                    "Transaction {} has invalid delete_at timestamp",
331                    transaction.id
332                );
333            }
334            false
335        })
336}
337
338/// Deletes an expired transaction from the repository.
339///
340/// # Arguments
341/// * `transaction` - The transaction to delete
342/// * `transaction_repo` - Reference to the transaction repository
343/// * `relayer_id` - ID of the relayer (for logging)
344///
345/// # Returns
346/// * `Result<()>` - Success or failure of the deletion
347async fn delete_expired_transaction(
348    transaction: &TransactionRepoModel,
349    transaction_repo: &Arc<impl Repository<TransactionRepoModel, String>>,
350    relayer_id: &str,
351) -> Result<()> {
352    // Validate that the transaction is in a final state before deletion
353    if !FINAL_TRANSACTION_STATUSES.contains(&transaction.status) {
354        return Err(eyre::eyre!(
355            "Transaction {} is not in a final state (current: {:?})",
356            transaction.id,
357            transaction.status
358        ));
359    }
360
361    debug!(
362        "Deleting expired transaction {} (status: {:?}) for relayer: {}",
363        transaction.id, transaction.status, relayer_id
364    );
365
366    transaction_repo
367        .delete_by_id(transaction.id.clone())
368        .await
369        .map_err(|e| eyre::eyre!("Failed to delete transaction {}: {}", transaction.id, e))?;
370
371    info!(
372        "Successfully deleted expired transaction: {} (status: {:?}) for relayer: {}",
373        transaction.id, transaction.status, relayer_id
374    );
375
376    Ok(())
377}
378
379/// Reports the aggregated results of the cleanup operation.
380///
381/// # Arguments
382/// * `cleanup_results` - Results from processing all relayers
383///
384/// # Returns
385/// * `Result<()>` - Success if all went well, error if there were failures
386async fn report_cleanup_results(cleanup_results: Vec<RelayerCleanupResult>) -> Result<()> {
387    let total_cleaned: usize = cleanup_results.iter().map(|r| r.cleaned_count).sum();
388    let total_errors = cleanup_results.iter().filter(|r| r.error.is_some()).count();
389    let total_relayers = cleanup_results.len();
390
391    // Log detailed results for relayers with errors
392    for result in &cleanup_results {
393        if let Some(error) = &result.error {
394            error!(
395                "Failed to cleanup transactions for relayer {}: {}",
396                result.relayer_id, error
397            );
398        }
399    }
400
401    if total_errors > 0 {
402        warn!(
403            "Transaction cleanup completed with {} errors out of {} relayers. Successfully cleaned {} transactions.",
404            total_errors, total_relayers, total_cleaned
405        );
406
407        // Return error if there were failures, but don't fail the entire job
408        // This allows for partial success and retry of failed relayers
409        Err(eyre::eyre!(
410            "Cleanup completed with {} errors out of {} relayers",
411            total_errors,
412            total_relayers
413        ))
414    } else {
415        info!(
416            "Transaction cleanup completed successfully. Cleaned {} transactions from {} relayers.",
417            total_cleaned, total_relayers
418        );
419        Ok(())
420    }
421}
422
423#[cfg(test)]
424mod tests {
425
426    use super::*;
427    use crate::{
428        models::{
429            NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
430            TransactionRepoModel, TransactionStatus,
431        },
432        repositories::{InMemoryTransactionRepository, Repository},
433        utils::mocks::mockutils::create_mock_transaction,
434    };
435    use chrono::{Duration, Utc};
436
437    fn create_test_transaction(
438        id: &str,
439        relayer_id: &str,
440        status: TransactionStatus,
441        delete_at: Option<String>,
442    ) -> TransactionRepoModel {
443        let mut tx = create_mock_transaction();
444        tx.id = id.to_string();
445        tx.relayer_id = relayer_id.to_string();
446        tx.status = status;
447        tx.delete_at = delete_at;
448        tx
449    }
450
451    #[tokio::test]
452    async fn test_should_delete_transaction_expired() {
453        let now = Utc::now();
454        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
455
456        let transaction = create_test_transaction(
457            "test-tx",
458            "test-relayer",
459            TransactionStatus::Confirmed,
460            Some(expired_delete_at),
461        );
462
463        assert!(should_delete_transaction(&transaction, now));
464    }
465
466    #[tokio::test]
467    async fn test_should_delete_transaction_not_expired() {
468        let now = Utc::now();
469        let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
470
471        let transaction = create_test_transaction(
472            "test-tx",
473            "test-relayer",
474            TransactionStatus::Confirmed,
475            Some(future_delete_at),
476        );
477
478        assert!(!should_delete_transaction(&transaction, now));
479    }
480
481    #[tokio::test]
482    async fn test_should_delete_transaction_no_delete_at() {
483        let now = Utc::now();
484
485        let transaction = create_test_transaction(
486            "test-tx",
487            "test-relayer",
488            TransactionStatus::Confirmed,
489            None,
490        );
491
492        assert!(!should_delete_transaction(&transaction, now));
493    }
494
495    #[tokio::test]
496    async fn test_should_delete_transaction_invalid_timestamp() {
497        let now = Utc::now();
498
499        let transaction = create_test_transaction(
500            "test-tx",
501            "test-relayer",
502            TransactionStatus::Confirmed,
503            Some("invalid-timestamp".to_string()),
504        );
505
506        assert!(!should_delete_transaction(&transaction, now));
507    }
508
509    #[tokio::test]
510    async fn test_process_transactions_for_cleanup_parallel() {
511        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
512        let relayer_id = "test-relayer";
513        let now = Utc::now();
514
515        // Create test transactions
516        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
517        let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
518
519        let expired_tx = create_test_transaction(
520            "expired-tx",
521            relayer_id,
522            TransactionStatus::Confirmed,
523            Some(expired_delete_at),
524        );
525        let future_tx = create_test_transaction(
526            "future-tx",
527            relayer_id,
528            TransactionStatus::Failed,
529            Some(future_delete_at),
530        );
531        let no_delete_tx = create_test_transaction(
532            "no-delete-tx",
533            relayer_id,
534            TransactionStatus::Canceled,
535            None,
536        );
537
538        // Store transactions
539        transaction_repo.create(expired_tx.clone()).await.unwrap();
540        transaction_repo.create(future_tx.clone()).await.unwrap();
541        transaction_repo.create(no_delete_tx.clone()).await.unwrap();
542
543        let transactions = vec![expired_tx, future_tx, no_delete_tx];
544
545        // Process transactions
546        let cleaned_count =
547            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
548                .await;
549
550        // Should have cleaned up 1 expired transaction
551        assert_eq!(cleaned_count, 1);
552
553        // Verify expired transaction was deleted
554        assert!(transaction_repo
555            .get_by_id("expired-tx".to_string())
556            .await
557            .is_err());
558
559        // Verify non-expired transactions still exist
560        assert!(transaction_repo
561            .get_by_id("future-tx".to_string())
562            .await
563            .is_ok());
564        assert!(transaction_repo
565            .get_by_id("no-delete-tx".to_string())
566            .await
567            .is_ok());
568    }
569
570    #[tokio::test]
571    async fn test_delete_expired_transaction() {
572        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
573        let relayer_id = "test-relayer";
574
575        let transaction = create_test_transaction(
576            "test-tx",
577            relayer_id,
578            TransactionStatus::Confirmed, // Final status
579            Some(Utc::now().to_rfc3339()),
580        );
581
582        // Store transaction
583        transaction_repo.create(transaction.clone()).await.unwrap();
584
585        // Verify it exists
586        assert!(transaction_repo
587            .get_by_id("test-tx".to_string())
588            .await
589            .is_ok());
590
591        // Delete it
592        let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
593        assert!(result.is_ok());
594
595        // Verify it was deleted
596        assert!(transaction_repo
597            .get_by_id("test-tx".to_string())
598            .await
599            .is_err());
600    }
601
602    #[tokio::test]
603    async fn test_delete_expired_transaction_validates_final_status() {
604        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
605        let relayer_id = "test-relayer";
606
607        let transaction = create_test_transaction(
608            "test-tx",
609            relayer_id,
610            TransactionStatus::Pending, // Non-final status
611            Some(Utc::now().to_rfc3339()),
612        );
613
614        // Store transaction
615        transaction_repo.create(transaction.clone()).await.unwrap();
616
617        // Verify it exists
618        assert!(transaction_repo
619            .get_by_id("test-tx".to_string())
620            .await
621            .is_ok());
622
623        // Try to delete it - should fail due to validation
624        let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
625        assert!(result.is_err());
626
627        let error_message = result.unwrap_err().to_string();
628        assert!(error_message.contains("is not in a final state"));
629        assert!(error_message.contains("Pending"));
630
631        // Verify it still exists (wasn't deleted)
632        assert!(transaction_repo
633            .get_by_id("test-tx".to_string())
634            .await
635            .is_ok());
636    }
637
638    #[tokio::test]
639    async fn test_delete_expired_transaction_validates_all_final_statuses() {
640        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
641        let relayer_id = "test-relayer";
642
643        // Test each final status to ensure they all pass validation
644        let final_statuses = [
645            TransactionStatus::Confirmed,
646            TransactionStatus::Failed,
647            TransactionStatus::Canceled,
648            TransactionStatus::Expired,
649        ];
650
651        for (i, status) in final_statuses.iter().enumerate() {
652            let tx_id = format!("test-tx-{}", i);
653            let transaction = create_test_transaction(
654                &tx_id,
655                relayer_id,
656                status.clone(),
657                Some(Utc::now().to_rfc3339()),
658            );
659
660            // Store transaction
661            transaction_repo.create(transaction.clone()).await.unwrap();
662
663            // Delete it - should succeed for all final statuses
664            let result =
665                delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
666            assert!(
667                result.is_ok(),
668                "Failed to delete transaction with status: {:?}",
669                status
670            );
671
672            // Verify it was deleted
673            assert!(transaction_repo.get_by_id(tx_id).await.is_err());
674        }
675    }
676
677    #[tokio::test]
678    async fn test_fetch_final_transactions() {
679        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
680        let relayer_id = "test-relayer";
681
682        // Create transactions with different statuses
683        let confirmed_tx = create_test_transaction(
684            "confirmed-tx",
685            relayer_id,
686            TransactionStatus::Confirmed,
687            None,
688        );
689        let pending_tx =
690            create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
691        let failed_tx =
692            create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
693
694        // Store transactions
695        transaction_repo.create(confirmed_tx).await.unwrap();
696        transaction_repo.create(pending_tx).await.unwrap();
697        transaction_repo.create(failed_tx).await.unwrap();
698
699        // Fetch final transactions
700        let final_transactions = fetch_final_transactions(relayer_id, &transaction_repo)
701            .await
702            .unwrap();
703
704        // Should only return transactions with final statuses (Confirmed, Failed)
705        assert_eq!(final_transactions.len(), 2);
706        let final_ids: Vec<&String> = final_transactions.iter().map(|tx| &tx.id).collect();
707        assert!(final_ids.contains(&&"confirmed-tx".to_string()));
708        assert!(final_ids.contains(&&"failed-tx".to_string()));
709        assert!(!final_ids.contains(&&"pending-tx".to_string()));
710    }
711
712    #[tokio::test]
713    async fn test_report_cleanup_results_success() {
714        let results = vec![
715            RelayerCleanupResult {
716                relayer_id: "relayer-1".to_string(),
717                cleaned_count: 2,
718                error: None,
719            },
720            RelayerCleanupResult {
721                relayer_id: "relayer-2".to_string(),
722                cleaned_count: 1,
723                error: None,
724            },
725        ];
726
727        let result = report_cleanup_results(results).await;
728        assert!(result.is_ok());
729    }
730
731    #[tokio::test]
732    async fn test_report_cleanup_results_with_errors() {
733        let results = vec![
734            RelayerCleanupResult {
735                relayer_id: "relayer-1".to_string(),
736                cleaned_count: 2,
737                error: None,
738            },
739            RelayerCleanupResult {
740                relayer_id: "relayer-2".to_string(),
741                cleaned_count: 0,
742                error: Some("Database error".to_string()),
743            },
744        ];
745
746        let result = report_cleanup_results(results).await;
747        assert!(result.is_err());
748    }
749
750    #[tokio::test]
751    async fn test_process_single_relayer_success() {
752        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
753        let relayer = RelayerRepoModel {
754            id: "test-relayer".to_string(),
755            name: "Test Relayer".to_string(),
756            network: "ethereum".to_string(),
757            paused: false,
758            network_type: NetworkType::Evm,
759            signer_id: "test-signer".to_string(),
760            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
761            address: "0x1234567890123456789012345678901234567890".to_string(),
762            notification_id: None,
763            system_disabled: false,
764            custom_rpc_urls: None,
765        };
766        let now = Utc::now();
767
768        // Create expired and non-expired transactions
769        let expired_tx = create_test_transaction(
770            "expired-tx",
771            &relayer.id,
772            TransactionStatus::Confirmed,
773            Some((now - Duration::hours(1)).to_rfc3339()),
774        );
775        let future_tx = create_test_transaction(
776            "future-tx",
777            &relayer.id,
778            TransactionStatus::Failed,
779            Some((now + Duration::hours(1)).to_rfc3339()),
780        );
781
782        transaction_repo.create(expired_tx).await.unwrap();
783        transaction_repo.create(future_tx).await.unwrap();
784
785        let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
786
787        assert_eq!(result.relayer_id, relayer.id);
788        assert_eq!(result.cleaned_count, 1);
789        assert!(result.error.is_none());
790    }
791
792    #[tokio::test]
793    async fn test_process_single_relayer_no_transactions() {
794        // Create a relayer with no transactions in the repo
795        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
796        let relayer = RelayerRepoModel {
797            id: "empty-relayer".to_string(),
798            name: "Empty Relayer".to_string(),
799            network: "ethereum".to_string(),
800            paused: false,
801            network_type: NetworkType::Evm,
802            signer_id: "test-signer".to_string(),
803            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
804            address: "0x1234567890123456789012345678901234567890".to_string(),
805            notification_id: None,
806            system_disabled: false,
807            custom_rpc_urls: None,
808        };
809        let now = Utc::now();
810
811        // This should succeed but find no transactions
812        let result = process_single_relayer(relayer.clone(), transaction_repo, now).await;
813
814        assert_eq!(result.relayer_id, relayer.id);
815        assert_eq!(result.cleaned_count, 0);
816        assert!(result.error.is_none()); // No error, just no transactions found
817    }
818
819    #[tokio::test]
820    async fn test_process_transactions_with_empty_list() {
821        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
822        let relayer_id = "test-relayer";
823        let now = Utc::now();
824        let transactions = vec![];
825
826        let cleaned_count =
827            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
828                .await;
829
830        assert_eq!(cleaned_count, 0);
831    }
832
833    #[tokio::test]
834    async fn test_process_transactions_with_no_expired() {
835        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
836        let relayer_id = "test-relayer";
837        let now = Utc::now();
838
839        // Create only non-expired transactions
840        let future_tx1 = create_test_transaction(
841            "future-tx-1",
842            relayer_id,
843            TransactionStatus::Confirmed,
844            Some((now + Duration::hours(1)).to_rfc3339()),
845        );
846        let future_tx2 = create_test_transaction(
847            "future-tx-2",
848            relayer_id,
849            TransactionStatus::Failed,
850            Some((now + Duration::hours(2)).to_rfc3339()),
851        );
852        let no_delete_tx = create_test_transaction(
853            "no-delete-tx",
854            relayer_id,
855            TransactionStatus::Canceled,
856            None,
857        );
858
859        let transactions = vec![future_tx1, future_tx2, no_delete_tx];
860
861        let cleaned_count =
862            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
863                .await;
864
865        assert_eq!(cleaned_count, 0);
866    }
867
868    #[tokio::test]
869    async fn test_should_delete_transaction_exactly_at_expiry_time() {
870        let now = Utc::now();
871        let exact_expiry_time = now.to_rfc3339();
872
873        let transaction = create_test_transaction(
874            "test-tx",
875            "test-relayer",
876            TransactionStatus::Confirmed,
877            Some(exact_expiry_time),
878        );
879
880        // Should be considered expired when exactly at expiry time
881        assert!(should_delete_transaction(&transaction, now));
882    }
883
884    #[tokio::test]
885    async fn test_parallel_processing_with_mixed_results() {
886        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
887        let relayer_id = "test-relayer";
888        let now = Utc::now();
889
890        // Create multiple expired transactions
891        let expired_tx1 = create_test_transaction(
892            "expired-tx-1",
893            relayer_id,
894            TransactionStatus::Confirmed,
895            Some((now - Duration::hours(1)).to_rfc3339()),
896        );
897        let expired_tx2 = create_test_transaction(
898            "expired-tx-2",
899            relayer_id,
900            TransactionStatus::Failed,
901            Some((now - Duration::hours(2)).to_rfc3339()),
902        );
903        let expired_tx3 = create_test_transaction(
904            "expired-tx-3",
905            relayer_id,
906            TransactionStatus::Canceled,
907            Some((now - Duration::hours(3)).to_rfc3339()),
908        );
909
910        // Store only some transactions (others will fail deletion due to NotFound)
911        transaction_repo.create(expired_tx1.clone()).await.unwrap();
912        transaction_repo.create(expired_tx2.clone()).await.unwrap();
913        // Don't store expired_tx3 - it will fail deletion
914
915        let transactions = vec![expired_tx1, expired_tx2, expired_tx3];
916
917        let cleaned_count =
918            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
919                .await;
920
921        // Should have cleaned 2 out of 3 transactions (one failed due to NotFound)
922        assert_eq!(cleaned_count, 2);
923    }
924
925    #[tokio::test]
926    async fn test_delete_expired_transaction_repository_error() {
927        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
928        let relayer_id = "test-relayer";
929
930        let transaction = create_test_transaction(
931            "nonexistent-tx",
932            relayer_id,
933            TransactionStatus::Confirmed,
934            Some(Utc::now().to_rfc3339()),
935        );
936
937        // Don't store the transaction, so delete will fail with NotFound
938        let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
939
940        assert!(result.is_err());
941        let error_message = result.unwrap_err().to_string();
942        assert!(error_message.contains("Failed to delete transaction"));
943    }
944
945    #[tokio::test]
946    async fn test_report_cleanup_results_empty() {
947        let results = vec![];
948        let result = report_cleanup_results(results).await;
949        assert!(result.is_ok());
950    }
951
952    #[tokio::test]
953    async fn test_fetch_final_transactions_with_mixed_statuses() {
954        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
955        let relayer_id = "test-relayer";
956
957        // Create transactions with all possible statuses
958        let confirmed_tx = create_test_transaction(
959            "confirmed-tx",
960            relayer_id,
961            TransactionStatus::Confirmed,
962            None,
963        );
964        let failed_tx =
965            create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
966        let canceled_tx =
967            create_test_transaction("canceled-tx", relayer_id, TransactionStatus::Canceled, None);
968        let expired_tx =
969            create_test_transaction("expired-tx", relayer_id, TransactionStatus::Expired, None);
970        let pending_tx =
971            create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
972        let sent_tx = create_test_transaction("sent-tx", relayer_id, TransactionStatus::Sent, None);
973
974        // Store all transactions
975        transaction_repo.create(confirmed_tx).await.unwrap();
976        transaction_repo.create(failed_tx).await.unwrap();
977        transaction_repo.create(canceled_tx).await.unwrap();
978        transaction_repo.create(expired_tx).await.unwrap();
979        transaction_repo.create(pending_tx).await.unwrap();
980        transaction_repo.create(sent_tx).await.unwrap();
981
982        // Fetch final transactions
983        let final_transactions = fetch_final_transactions(relayer_id, &transaction_repo)
984            .await
985            .unwrap();
986
987        // Should only return the 4 final status transactions
988        assert_eq!(final_transactions.len(), 4);
989        let final_ids: Vec<&String> = final_transactions.iter().map(|tx| &tx.id).collect();
990        assert!(final_ids.contains(&&"confirmed-tx".to_string()));
991        assert!(final_ids.contains(&&"failed-tx".to_string()));
992        assert!(final_ids.contains(&&"canceled-tx".to_string()));
993        assert!(final_ids.contains(&&"expired-tx".to_string()));
994        assert!(!final_ids.contains(&&"pending-tx".to_string()));
995        assert!(!final_ids.contains(&&"sent-tx".to_string()));
996    }
997}