1use actix_web::web::ThinData;
8use apalis::prelude::{Attempt, Data, *};
9use chrono::{DateTime, Utc};
10use eyre::Result;
11use log::{debug, error, info, warn};
12use std::sync::Arc;
13
14use crate::{
15 constants::{FINAL_TRANSACTION_STATUSES, WORKER_DEFAULT_MAXIMUM_RETRIES},
16 jobs::handle_result,
17 models::{DefaultAppState, RelayerRepoModel, TransactionRepoModel},
18 repositories::{Repository, TransactionRepository},
19};
20
21const MAX_CONCURRENT_RELAYERS: usize = 10;
23
24const MAX_CONCURRENT_TRANSACTIONS_PER_RELAYER: usize = 50;
26
27pub async fn transaction_cleanup_handler(
44 job: TransactionCleanupCronReminder,
45 data: Data<ThinData<DefaultAppState>>,
46 attempt: Attempt,
47) -> Result<(), Error> {
48 let result = handle_request(job, data, attempt.clone()).await;
49
50 handle_result(
51 result,
52 attempt,
53 "TransactionCleanup",
54 WORKER_DEFAULT_MAXIMUM_RETRIES,
55 )
56}
57
58#[derive(Default, Debug, Clone)]
60pub struct TransactionCleanupCronReminder();
61
62async fn handle_request(
72 _job: TransactionCleanupCronReminder,
73 data: Data<ThinData<DefaultAppState>>,
74 _attempt: Attempt,
75) -> Result<()> {
76 let now = Utc::now();
77 info!(
78 "Executing transaction cleanup from storage at: {}",
79 now.to_rfc3339()
80 );
81
82 let transaction_repo = data.transaction_repository();
83 let relayer_repo = data.relayer_repository();
84
85 let relayers = relayer_repo.list_all().await.map_err(|e| {
87 error!("Failed to fetch relayers for cleanup: {}", e);
88 eyre::eyre!("Failed to fetch relayers: {}", e)
89 })?;
90
91 info!("Found {} relayers to process for cleanup", relayers.len());
92
93 let cleanup_results = process_relayers_in_batches(relayers, transaction_repo, now).await;
95
96 report_cleanup_results(cleanup_results).await
98}
99
100async fn process_relayers_in_batches(
110 relayers: Vec<RelayerRepoModel>,
111 transaction_repo: Arc<impl TransactionRepository>,
112 now: DateTime<Utc>,
113) -> Vec<RelayerCleanupResult> {
114 use futures::stream::{self, StreamExt};
115
116 let results: Vec<RelayerCleanupResult> = stream::iter(relayers)
118 .map(|relayer| {
119 let repo_clone = Arc::clone(&transaction_repo);
120 async move { process_single_relayer(relayer, repo_clone, now).await }
121 })
122 .buffer_unordered(MAX_CONCURRENT_RELAYERS)
123 .collect()
124 .await;
125
126 results
127}
128
129#[derive(Debug)]
131struct RelayerCleanupResult {
132 relayer_id: String,
133 cleaned_count: usize,
134 error: Option<String>,
135}
136
137async fn process_single_relayer(
147 relayer: RelayerRepoModel,
148 transaction_repo: Arc<impl TransactionRepository>,
149 now: DateTime<Utc>,
150) -> RelayerCleanupResult {
151 debug!("Processing cleanup for relayer: {}", relayer.id);
152
153 match fetch_final_transactions(&relayer.id, &transaction_repo).await {
154 Ok(final_transactions) => {
155 debug!(
156 "Found {} transactions with final statuses for relayer: {}",
157 final_transactions.len(),
158 relayer.id
159 );
160
161 let cleaned_count = process_transactions_for_cleanup(
162 final_transactions,
163 &transaction_repo,
164 &relayer.id,
165 now,
166 )
167 .await;
168
169 if cleaned_count > 0 {
170 info!(
171 "Cleaned up {} expired transactions for relayer: {}",
172 cleaned_count, relayer.id
173 );
174 }
175
176 RelayerCleanupResult {
177 relayer_id: relayer.id,
178 cleaned_count,
179 error: None,
180 }
181 }
182 Err(e) => {
183 error!(
184 "Failed to fetch final transactions for relayer {}: {}",
185 relayer.id, e
186 );
187 RelayerCleanupResult {
188 relayer_id: relayer.id,
189 cleaned_count: 0,
190 error: Some(e.to_string()),
191 }
192 }
193 }
194}
195
196async fn fetch_final_transactions(
205 relayer_id: &str,
206 transaction_repo: &Arc<impl TransactionRepository>,
207) -> Result<Vec<TransactionRepoModel>> {
208 transaction_repo
209 .find_by_status(relayer_id, FINAL_TRANSACTION_STATUSES)
210 .await
211 .map_err(|e| {
212 eyre::eyre!(
213 "Failed to fetch final transactions for relayer {}: {}",
214 relayer_id,
215 e
216 )
217 })
218}
219
220async fn process_transactions_for_cleanup(
234 transactions: Vec<TransactionRepoModel>,
235 transaction_repo: &Arc<impl Repository<TransactionRepoModel, String>>,
236 relayer_id: &str,
237 now: DateTime<Utc>,
238) -> usize {
239 use futures::stream::{self, StreamExt};
240
241 if transactions.is_empty() {
242 return 0;
243 }
244
245 debug!(
246 "Processing {} transactions in parallel for relayer: {}",
247 transactions.len(),
248 relayer_id
249 );
250
251 let expired_transactions: Vec<TransactionRepoModel> = transactions
253 .into_iter()
254 .filter(|tx| should_delete_transaction(tx, now))
255 .collect();
256
257 if expired_transactions.is_empty() {
258 debug!("No expired transactions found for relayer: {}", relayer_id);
259 return 0;
260 }
261
262 debug!(
263 "Found {} expired transactions to delete for relayer: {}",
264 expired_transactions.len(),
265 relayer_id
266 );
267
268 let deletion_results: Vec<bool> = stream::iter(expired_transactions)
270 .map(|transaction| {
271 let repo_clone = Arc::clone(transaction_repo);
272 let relayer_id = relayer_id.to_string();
273 async move {
274 match delete_expired_transaction(&transaction, &repo_clone, &relayer_id).await {
275 Ok(()) => true,
276 Err(e) => {
277 error!(
278 "Failed to delete expired transaction {}: {}",
279 transaction.id, e
280 );
281 false
282 }
283 }
284 }
285 })
286 .buffer_unordered(MAX_CONCURRENT_TRANSACTIONS_PER_RELAYER)
287 .collect()
288 .await;
289
290 let cleaned_count = deletion_results.iter().filter(|&&success| success).count();
292
293 debug!(
294 "Successfully deleted {}/{} expired transactions for relayer: {}",
295 cleaned_count,
296 deletion_results.len(),
297 relayer_id
298 );
299
300 cleaned_count
301}
302
303fn should_delete_transaction(transaction: &TransactionRepoModel, now: DateTime<Utc>) -> bool {
312 transaction
313 .delete_at
314 .as_ref()
315 .and_then(|delete_at_str| DateTime::parse_from_rfc3339(delete_at_str).ok())
316 .map(|delete_at| {
317 let is_expired = now >= delete_at.with_timezone(&Utc);
318 if is_expired {
319 debug!(
320 "Transaction {} is expired (expired at: {})",
321 transaction.id,
322 delete_at.to_rfc3339()
323 );
324 }
325 is_expired
326 })
327 .unwrap_or_else(|| {
328 if transaction.delete_at.is_some() {
329 warn!(
330 "Transaction {} has invalid delete_at timestamp",
331 transaction.id
332 );
333 }
334 false
335 })
336}
337
338async fn delete_expired_transaction(
348 transaction: &TransactionRepoModel,
349 transaction_repo: &Arc<impl Repository<TransactionRepoModel, String>>,
350 relayer_id: &str,
351) -> Result<()> {
352 if !FINAL_TRANSACTION_STATUSES.contains(&transaction.status) {
354 return Err(eyre::eyre!(
355 "Transaction {} is not in a final state (current: {:?})",
356 transaction.id,
357 transaction.status
358 ));
359 }
360
361 debug!(
362 "Deleting expired transaction {} (status: {:?}) for relayer: {}",
363 transaction.id, transaction.status, relayer_id
364 );
365
366 transaction_repo
367 .delete_by_id(transaction.id.clone())
368 .await
369 .map_err(|e| eyre::eyre!("Failed to delete transaction {}: {}", transaction.id, e))?;
370
371 info!(
372 "Successfully deleted expired transaction: {} (status: {:?}) for relayer: {}",
373 transaction.id, transaction.status, relayer_id
374 );
375
376 Ok(())
377}
378
379async fn report_cleanup_results(cleanup_results: Vec<RelayerCleanupResult>) -> Result<()> {
387 let total_cleaned: usize = cleanup_results.iter().map(|r| r.cleaned_count).sum();
388 let total_errors = cleanup_results.iter().filter(|r| r.error.is_some()).count();
389 let total_relayers = cleanup_results.len();
390
391 for result in &cleanup_results {
393 if let Some(error) = &result.error {
394 error!(
395 "Failed to cleanup transactions for relayer {}: {}",
396 result.relayer_id, error
397 );
398 }
399 }
400
401 if total_errors > 0 {
402 warn!(
403 "Transaction cleanup completed with {} errors out of {} relayers. Successfully cleaned {} transactions.",
404 total_errors, total_relayers, total_cleaned
405 );
406
407 Err(eyre::eyre!(
410 "Cleanup completed with {} errors out of {} relayers",
411 total_errors,
412 total_relayers
413 ))
414 } else {
415 info!(
416 "Transaction cleanup completed successfully. Cleaned {} transactions from {} relayers.",
417 total_cleaned, total_relayers
418 );
419 Ok(())
420 }
421}
422
423#[cfg(test)]
424mod tests {
425
426 use super::*;
427 use crate::{
428 models::{
429 NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
430 TransactionRepoModel, TransactionStatus,
431 },
432 repositories::{InMemoryTransactionRepository, Repository},
433 utils::mocks::mockutils::create_mock_transaction,
434 };
435 use chrono::{Duration, Utc};
436
437 fn create_test_transaction(
438 id: &str,
439 relayer_id: &str,
440 status: TransactionStatus,
441 delete_at: Option<String>,
442 ) -> TransactionRepoModel {
443 let mut tx = create_mock_transaction();
444 tx.id = id.to_string();
445 tx.relayer_id = relayer_id.to_string();
446 tx.status = status;
447 tx.delete_at = delete_at;
448 tx
449 }
450
451 #[tokio::test]
452 async fn test_should_delete_transaction_expired() {
453 let now = Utc::now();
454 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
455
456 let transaction = create_test_transaction(
457 "test-tx",
458 "test-relayer",
459 TransactionStatus::Confirmed,
460 Some(expired_delete_at),
461 );
462
463 assert!(should_delete_transaction(&transaction, now));
464 }
465
466 #[tokio::test]
467 async fn test_should_delete_transaction_not_expired() {
468 let now = Utc::now();
469 let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
470
471 let transaction = create_test_transaction(
472 "test-tx",
473 "test-relayer",
474 TransactionStatus::Confirmed,
475 Some(future_delete_at),
476 );
477
478 assert!(!should_delete_transaction(&transaction, now));
479 }
480
481 #[tokio::test]
482 async fn test_should_delete_transaction_no_delete_at() {
483 let now = Utc::now();
484
485 let transaction = create_test_transaction(
486 "test-tx",
487 "test-relayer",
488 TransactionStatus::Confirmed,
489 None,
490 );
491
492 assert!(!should_delete_transaction(&transaction, now));
493 }
494
495 #[tokio::test]
496 async fn test_should_delete_transaction_invalid_timestamp() {
497 let now = Utc::now();
498
499 let transaction = create_test_transaction(
500 "test-tx",
501 "test-relayer",
502 TransactionStatus::Confirmed,
503 Some("invalid-timestamp".to_string()),
504 );
505
506 assert!(!should_delete_transaction(&transaction, now));
507 }
508
509 #[tokio::test]
510 async fn test_process_transactions_for_cleanup_parallel() {
511 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
512 let relayer_id = "test-relayer";
513 let now = Utc::now();
514
515 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
517 let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
518
519 let expired_tx = create_test_transaction(
520 "expired-tx",
521 relayer_id,
522 TransactionStatus::Confirmed,
523 Some(expired_delete_at),
524 );
525 let future_tx = create_test_transaction(
526 "future-tx",
527 relayer_id,
528 TransactionStatus::Failed,
529 Some(future_delete_at),
530 );
531 let no_delete_tx = create_test_transaction(
532 "no-delete-tx",
533 relayer_id,
534 TransactionStatus::Canceled,
535 None,
536 );
537
538 transaction_repo.create(expired_tx.clone()).await.unwrap();
540 transaction_repo.create(future_tx.clone()).await.unwrap();
541 transaction_repo.create(no_delete_tx.clone()).await.unwrap();
542
543 let transactions = vec![expired_tx, future_tx, no_delete_tx];
544
545 let cleaned_count =
547 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
548 .await;
549
550 assert_eq!(cleaned_count, 1);
552
553 assert!(transaction_repo
555 .get_by_id("expired-tx".to_string())
556 .await
557 .is_err());
558
559 assert!(transaction_repo
561 .get_by_id("future-tx".to_string())
562 .await
563 .is_ok());
564 assert!(transaction_repo
565 .get_by_id("no-delete-tx".to_string())
566 .await
567 .is_ok());
568 }
569
570 #[tokio::test]
571 async fn test_delete_expired_transaction() {
572 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
573 let relayer_id = "test-relayer";
574
575 let transaction = create_test_transaction(
576 "test-tx",
577 relayer_id,
578 TransactionStatus::Confirmed, Some(Utc::now().to_rfc3339()),
580 );
581
582 transaction_repo.create(transaction.clone()).await.unwrap();
584
585 assert!(transaction_repo
587 .get_by_id("test-tx".to_string())
588 .await
589 .is_ok());
590
591 let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
593 assert!(result.is_ok());
594
595 assert!(transaction_repo
597 .get_by_id("test-tx".to_string())
598 .await
599 .is_err());
600 }
601
602 #[tokio::test]
603 async fn test_delete_expired_transaction_validates_final_status() {
604 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
605 let relayer_id = "test-relayer";
606
607 let transaction = create_test_transaction(
608 "test-tx",
609 relayer_id,
610 TransactionStatus::Pending, Some(Utc::now().to_rfc3339()),
612 );
613
614 transaction_repo.create(transaction.clone()).await.unwrap();
616
617 assert!(transaction_repo
619 .get_by_id("test-tx".to_string())
620 .await
621 .is_ok());
622
623 let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
625 assert!(result.is_err());
626
627 let error_message = result.unwrap_err().to_string();
628 assert!(error_message.contains("is not in a final state"));
629 assert!(error_message.contains("Pending"));
630
631 assert!(transaction_repo
633 .get_by_id("test-tx".to_string())
634 .await
635 .is_ok());
636 }
637
638 #[tokio::test]
639 async fn test_delete_expired_transaction_validates_all_final_statuses() {
640 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
641 let relayer_id = "test-relayer";
642
643 let final_statuses = [
645 TransactionStatus::Confirmed,
646 TransactionStatus::Failed,
647 TransactionStatus::Canceled,
648 TransactionStatus::Expired,
649 ];
650
651 for (i, status) in final_statuses.iter().enumerate() {
652 let tx_id = format!("test-tx-{}", i);
653 let transaction = create_test_transaction(
654 &tx_id,
655 relayer_id,
656 status.clone(),
657 Some(Utc::now().to_rfc3339()),
658 );
659
660 transaction_repo.create(transaction.clone()).await.unwrap();
662
663 let result =
665 delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
666 assert!(
667 result.is_ok(),
668 "Failed to delete transaction with status: {:?}",
669 status
670 );
671
672 assert!(transaction_repo.get_by_id(tx_id).await.is_err());
674 }
675 }
676
677 #[tokio::test]
678 async fn test_fetch_final_transactions() {
679 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
680 let relayer_id = "test-relayer";
681
682 let confirmed_tx = create_test_transaction(
684 "confirmed-tx",
685 relayer_id,
686 TransactionStatus::Confirmed,
687 None,
688 );
689 let pending_tx =
690 create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
691 let failed_tx =
692 create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
693
694 transaction_repo.create(confirmed_tx).await.unwrap();
696 transaction_repo.create(pending_tx).await.unwrap();
697 transaction_repo.create(failed_tx).await.unwrap();
698
699 let final_transactions = fetch_final_transactions(relayer_id, &transaction_repo)
701 .await
702 .unwrap();
703
704 assert_eq!(final_transactions.len(), 2);
706 let final_ids: Vec<&String> = final_transactions.iter().map(|tx| &tx.id).collect();
707 assert!(final_ids.contains(&&"confirmed-tx".to_string()));
708 assert!(final_ids.contains(&&"failed-tx".to_string()));
709 assert!(!final_ids.contains(&&"pending-tx".to_string()));
710 }
711
712 #[tokio::test]
713 async fn test_report_cleanup_results_success() {
714 let results = vec![
715 RelayerCleanupResult {
716 relayer_id: "relayer-1".to_string(),
717 cleaned_count: 2,
718 error: None,
719 },
720 RelayerCleanupResult {
721 relayer_id: "relayer-2".to_string(),
722 cleaned_count: 1,
723 error: None,
724 },
725 ];
726
727 let result = report_cleanup_results(results).await;
728 assert!(result.is_ok());
729 }
730
731 #[tokio::test]
732 async fn test_report_cleanup_results_with_errors() {
733 let results = vec![
734 RelayerCleanupResult {
735 relayer_id: "relayer-1".to_string(),
736 cleaned_count: 2,
737 error: None,
738 },
739 RelayerCleanupResult {
740 relayer_id: "relayer-2".to_string(),
741 cleaned_count: 0,
742 error: Some("Database error".to_string()),
743 },
744 ];
745
746 let result = report_cleanup_results(results).await;
747 assert!(result.is_err());
748 }
749
750 #[tokio::test]
751 async fn test_process_single_relayer_success() {
752 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
753 let relayer = RelayerRepoModel {
754 id: "test-relayer".to_string(),
755 name: "Test Relayer".to_string(),
756 network: "ethereum".to_string(),
757 paused: false,
758 network_type: NetworkType::Evm,
759 signer_id: "test-signer".to_string(),
760 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
761 address: "0x1234567890123456789012345678901234567890".to_string(),
762 notification_id: None,
763 system_disabled: false,
764 custom_rpc_urls: None,
765 };
766 let now = Utc::now();
767
768 let expired_tx = create_test_transaction(
770 "expired-tx",
771 &relayer.id,
772 TransactionStatus::Confirmed,
773 Some((now - Duration::hours(1)).to_rfc3339()),
774 );
775 let future_tx = create_test_transaction(
776 "future-tx",
777 &relayer.id,
778 TransactionStatus::Failed,
779 Some((now + Duration::hours(1)).to_rfc3339()),
780 );
781
782 transaction_repo.create(expired_tx).await.unwrap();
783 transaction_repo.create(future_tx).await.unwrap();
784
785 let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
786
787 assert_eq!(result.relayer_id, relayer.id);
788 assert_eq!(result.cleaned_count, 1);
789 assert!(result.error.is_none());
790 }
791
792 #[tokio::test]
793 async fn test_process_single_relayer_no_transactions() {
794 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
796 let relayer = RelayerRepoModel {
797 id: "empty-relayer".to_string(),
798 name: "Empty Relayer".to_string(),
799 network: "ethereum".to_string(),
800 paused: false,
801 network_type: NetworkType::Evm,
802 signer_id: "test-signer".to_string(),
803 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
804 address: "0x1234567890123456789012345678901234567890".to_string(),
805 notification_id: None,
806 system_disabled: false,
807 custom_rpc_urls: None,
808 };
809 let now = Utc::now();
810
811 let result = process_single_relayer(relayer.clone(), transaction_repo, now).await;
813
814 assert_eq!(result.relayer_id, relayer.id);
815 assert_eq!(result.cleaned_count, 0);
816 assert!(result.error.is_none()); }
818
819 #[tokio::test]
820 async fn test_process_transactions_with_empty_list() {
821 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
822 let relayer_id = "test-relayer";
823 let now = Utc::now();
824 let transactions = vec![];
825
826 let cleaned_count =
827 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
828 .await;
829
830 assert_eq!(cleaned_count, 0);
831 }
832
833 #[tokio::test]
834 async fn test_process_transactions_with_no_expired() {
835 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
836 let relayer_id = "test-relayer";
837 let now = Utc::now();
838
839 let future_tx1 = create_test_transaction(
841 "future-tx-1",
842 relayer_id,
843 TransactionStatus::Confirmed,
844 Some((now + Duration::hours(1)).to_rfc3339()),
845 );
846 let future_tx2 = create_test_transaction(
847 "future-tx-2",
848 relayer_id,
849 TransactionStatus::Failed,
850 Some((now + Duration::hours(2)).to_rfc3339()),
851 );
852 let no_delete_tx = create_test_transaction(
853 "no-delete-tx",
854 relayer_id,
855 TransactionStatus::Canceled,
856 None,
857 );
858
859 let transactions = vec![future_tx1, future_tx2, no_delete_tx];
860
861 let cleaned_count =
862 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
863 .await;
864
865 assert_eq!(cleaned_count, 0);
866 }
867
868 #[tokio::test]
869 async fn test_should_delete_transaction_exactly_at_expiry_time() {
870 let now = Utc::now();
871 let exact_expiry_time = now.to_rfc3339();
872
873 let transaction = create_test_transaction(
874 "test-tx",
875 "test-relayer",
876 TransactionStatus::Confirmed,
877 Some(exact_expiry_time),
878 );
879
880 assert!(should_delete_transaction(&transaction, now));
882 }
883
884 #[tokio::test]
885 async fn test_parallel_processing_with_mixed_results() {
886 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
887 let relayer_id = "test-relayer";
888 let now = Utc::now();
889
890 let expired_tx1 = create_test_transaction(
892 "expired-tx-1",
893 relayer_id,
894 TransactionStatus::Confirmed,
895 Some((now - Duration::hours(1)).to_rfc3339()),
896 );
897 let expired_tx2 = create_test_transaction(
898 "expired-tx-2",
899 relayer_id,
900 TransactionStatus::Failed,
901 Some((now - Duration::hours(2)).to_rfc3339()),
902 );
903 let expired_tx3 = create_test_transaction(
904 "expired-tx-3",
905 relayer_id,
906 TransactionStatus::Canceled,
907 Some((now - Duration::hours(3)).to_rfc3339()),
908 );
909
910 transaction_repo.create(expired_tx1.clone()).await.unwrap();
912 transaction_repo.create(expired_tx2.clone()).await.unwrap();
913 let transactions = vec![expired_tx1, expired_tx2, expired_tx3];
916
917 let cleaned_count =
918 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
919 .await;
920
921 assert_eq!(cleaned_count, 2);
923 }
924
925 #[tokio::test]
926 async fn test_delete_expired_transaction_repository_error() {
927 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
928 let relayer_id = "test-relayer";
929
930 let transaction = create_test_transaction(
931 "nonexistent-tx",
932 relayer_id,
933 TransactionStatus::Confirmed,
934 Some(Utc::now().to_rfc3339()),
935 );
936
937 let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
939
940 assert!(result.is_err());
941 let error_message = result.unwrap_err().to_string();
942 assert!(error_message.contains("Failed to delete transaction"));
943 }
944
945 #[tokio::test]
946 async fn test_report_cleanup_results_empty() {
947 let results = vec![];
948 let result = report_cleanup_results(results).await;
949 assert!(result.is_ok());
950 }
951
952 #[tokio::test]
953 async fn test_fetch_final_transactions_with_mixed_statuses() {
954 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
955 let relayer_id = "test-relayer";
956
957 let confirmed_tx = create_test_transaction(
959 "confirmed-tx",
960 relayer_id,
961 TransactionStatus::Confirmed,
962 None,
963 );
964 let failed_tx =
965 create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
966 let canceled_tx =
967 create_test_transaction("canceled-tx", relayer_id, TransactionStatus::Canceled, None);
968 let expired_tx =
969 create_test_transaction("expired-tx", relayer_id, TransactionStatus::Expired, None);
970 let pending_tx =
971 create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
972 let sent_tx = create_test_transaction("sent-tx", relayer_id, TransactionStatus::Sent, None);
973
974 transaction_repo.create(confirmed_tx).await.unwrap();
976 transaction_repo.create(failed_tx).await.unwrap();
977 transaction_repo.create(canceled_tx).await.unwrap();
978 transaction_repo.create(expired_tx).await.unwrap();
979 transaction_repo.create(pending_tx).await.unwrap();
980 transaction_repo.create(sent_tx).await.unwrap();
981
982 let final_transactions = fetch_final_transactions(relayer_id, &transaction_repo)
984 .await
985 .unwrap();
986
987 assert_eq!(final_transactions.len(), 4);
989 let final_ids: Vec<&String> = final_transactions.iter().map(|tx| &tx.id).collect();
990 assert!(final_ids.contains(&&"confirmed-tx".to_string()));
991 assert!(final_ids.contains(&&"failed-tx".to_string()));
992 assert!(final_ids.contains(&&"canceled-tx".to_string()));
993 assert!(final_ids.contains(&&"expired-tx".to_string()));
994 assert!(!final_ids.contains(&&"pending-tx".to_string()));
995 assert!(!final_ids.contains(&&"sent-tx".to_string()));
996 }
997}