1use crate::models::{
4 NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
5 TransactionStatus, TransactionUpdateRequest,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{
9 BatchRetrievalResult, PaginatedResult, Repository, TransactionRepository,
10};
11use async_trait::async_trait;
12use log::{debug, error, warn};
13use redis::aio::ConnectionManager;
14use redis::AsyncCommands;
15use std::fmt;
16use std::sync::Arc;
17
18const RELAYER_PREFIX: &str = "relayer";
19const TX_PREFIX: &str = "tx";
20const STATUS_PREFIX: &str = "status";
21const NONCE_PREFIX: &str = "nonce";
22const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
23const RELAYER_LIST_KEY: &str = "relayer_list";
24
25#[derive(Clone)]
26pub struct RedisTransactionRepository {
27 pub client: Arc<ConnectionManager>,
28 pub key_prefix: String,
29}
30
31impl RedisRepository for RedisTransactionRepository {}
32
33impl RedisTransactionRepository {
34 pub fn new(
35 connection_manager: Arc<ConnectionManager>,
36 key_prefix: String,
37 ) -> Result<Self, RepositoryError> {
38 if key_prefix.is_empty() {
39 return Err(RepositoryError::InvalidData(
40 "Redis key prefix cannot be empty".to_string(),
41 ));
42 }
43
44 Ok(Self {
45 client: connection_manager,
46 key_prefix,
47 })
48 }
49
50 fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
52 format!(
53 "{}:{}:{}:{}:{}",
54 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
55 )
56 }
57
58 fn tx_to_relayer_key(&self, tx_id: &str) -> String {
60 format!(
61 "{}:{}:{}:{}",
62 self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
63 )
64 }
65
66 fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
68 format!(
69 "{}:{}:{}:{}:{}",
70 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
71 )
72 }
73
74 fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
76 format!(
77 "{}:{}:{}:{}:{}",
78 self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
79 )
80 }
81
82 fn relayer_list_key(&self) -> String {
84 format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
85 }
86
87 async fn get_transactions_by_ids(
89 &self,
90 ids: &[String],
91 ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
92 if ids.is_empty() {
93 debug!("No transaction IDs provided for batch fetch");
94 return Ok(BatchRetrievalResult {
95 results: vec![],
96 failed_ids: vec![],
97 });
98 }
99
100 let mut conn = self.client.as_ref().clone();
101
102 let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
103
104 debug!("Fetching relayer IDs for {} transactions", ids.len());
105
106 let relayer_ids: Vec<Option<String>> = conn
107 .mget(&reverse_keys)
108 .await
109 .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
110
111 let mut tx_keys = Vec::new();
112 let mut valid_ids = Vec::new();
113 let mut failed_ids = Vec::new();
114 for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
115 match relayer_id {
116 Some(relayer_id) => {
117 tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
118 valid_ids.push(ids[i].clone());
119 }
120 None => {
121 warn!("No relayer found for transaction {}", ids[i]);
122 failed_ids.push(ids[i].clone());
123 }
124 }
125 }
126
127 if tx_keys.is_empty() {
128 debug!("No valid transactions found for batch fetch");
129 return Ok(BatchRetrievalResult {
130 results: vec![],
131 failed_ids,
132 });
133 }
134
135 debug!("Batch fetching {} transaction data", tx_keys.len());
136
137 let values: Vec<Option<String>> = conn
138 .mget(&tx_keys)
139 .await
140 .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
141
142 let mut transactions = Vec::new();
143 let mut failed_count = 0;
144 let mut failed_ids = Vec::new();
145 for (i, value) in values.into_iter().enumerate() {
146 match value {
147 Some(json) => {
148 match self.deserialize_entity::<TransactionRepoModel>(
149 &json,
150 &valid_ids[i],
151 "transaction",
152 ) {
153 Ok(tx) => transactions.push(tx),
154 Err(e) => {
155 failed_count += 1;
156 error!("Failed to deserialize transaction {}: {}", valid_ids[i], e);
157 }
159 }
160 }
161 None => {
162 warn!("Transaction {} not found in batch fetch", valid_ids[i]);
163 failed_ids.push(valid_ids[i].clone());
164 }
165 }
166 }
167
168 if failed_count > 0 {
169 warn!(
170 "Failed to deserialize {} out of {} transactions in batch",
171 failed_count,
172 valid_ids.len()
173 );
174 }
175
176 debug!("Successfully fetched {} transactions", transactions.len());
177 Ok(BatchRetrievalResult {
178 results: transactions,
179 failed_ids,
180 })
181 }
182
183 fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
185 match network_data.get_evm_transaction_data() {
186 Ok(tx_data) => tx_data.nonce,
187 Err(_) => {
188 debug!("No EVM transaction data available for nonce extraction");
189 None
190 }
191 }
192 }
193
194 async fn update_indexes(
196 &self,
197 tx: &TransactionRepoModel,
198 old_tx: Option<&TransactionRepoModel>,
199 ) -> Result<(), RepositoryError> {
200 let mut conn = self.client.as_ref().clone();
201 let mut pipe = redis::pipe();
202 pipe.atomic();
203
204 debug!("Updating indexes for transaction {}", tx.id);
205
206 let relayer_list_key = self.relayer_list_key();
208 pipe.sadd(&relayer_list_key, &tx.relayer_id);
209
210 let new_status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
212 pipe.sadd(&new_status_key, &tx.id);
213
214 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
215 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
216 pipe.set(&nonce_key, &tx.id);
217 debug!("Added nonce index for tx {} with nonce {}", tx.id, nonce);
218 }
219
220 if let Some(old) = old_tx {
222 if old.status != tx.status {
223 let old_status_key = self.relayer_status_key(&old.relayer_id, &old.status);
224 pipe.srem(&old_status_key, &tx.id);
225 debug!(
226 "Removing old status index for tx {} (status: {} -> {})",
227 tx.id, old.status, tx.status
228 );
229 }
230
231 if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
233 let new_nonce = self.extract_nonce(&tx.network_data);
234 if Some(old_nonce) != new_nonce {
235 let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
236 pipe.del(&old_nonce_key);
237 debug!(
238 "Removing old nonce index for tx {} (nonce: {} -> {:?})",
239 tx.id, old_nonce, new_nonce
240 );
241 }
242 }
243 }
244
245 pipe.exec_async(&mut conn).await.map_err(|e| {
247 error!(
248 "Index update pipeline failed for transaction {}: {}",
249 tx.id, e
250 );
251 self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
252 })?;
253
254 debug!("Successfully updated indexes for transaction {}", tx.id);
255 Ok(())
256 }
257
258 async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
260 let mut conn = self.client.as_ref().clone();
261 let mut pipe = redis::pipe();
262 pipe.atomic();
263
264 debug!("Removing all indexes for transaction {}", tx.id);
265
266 let status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
268 pipe.srem(&status_key, &tx.id);
269
270 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
272 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
273 pipe.del(&nonce_key);
274 debug!("Removing nonce index for tx {} with nonce {}", tx.id, nonce);
275 }
276
277 let reverse_key = self.tx_to_relayer_key(&tx.id);
279 pipe.del(&reverse_key);
280
281 pipe.exec_async(&mut conn).await.map_err(|e| {
282 error!("Index removal failed for transaction {}: {}", tx.id, e);
283 self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
284 })?;
285
286 debug!("Successfully removed all indexes for transaction {}", tx.id);
287 Ok(())
288 }
289}
290
291impl fmt::Debug for RedisTransactionRepository {
292 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293 f.debug_struct("RedisTransactionRepository")
294 .field("client", &"<ConnectionManager>")
295 .field("key_prefix", &self.key_prefix)
296 .finish()
297 }
298}
299
300#[async_trait]
301impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
302 async fn create(
303 &self,
304 entity: TransactionRepoModel,
305 ) -> Result<TransactionRepoModel, RepositoryError> {
306 if entity.id.is_empty() {
307 return Err(RepositoryError::InvalidData(
308 "Transaction ID cannot be empty".to_string(),
309 ));
310 }
311
312 let key = self.tx_key(&entity.relayer_id, &entity.id);
313 let reverse_key = self.tx_to_relayer_key(&entity.id);
314 let mut conn = self.client.as_ref().clone();
315
316 debug!("Creating transaction with ID: {}", entity.id);
317
318 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
319
320 let existing: Option<String> = conn
322 .get(&reverse_key)
323 .await
324 .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
325
326 if existing.is_some() {
327 return Err(RepositoryError::ConstraintViolation(format!(
328 "Transaction with ID {} already exists",
329 entity.id
330 )));
331 }
332
333 let mut pipe = redis::pipe();
335 pipe.atomic();
336 pipe.set(&key, &value);
337 pipe.set(&reverse_key, &entity.relayer_id);
338
339 pipe.exec_async(&mut conn)
340 .await
341 .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
342
343 if let Err(e) = self.update_indexes(&entity, None).await {
345 error!(
346 "Failed to update indexes for new transaction {}: {}",
347 entity.id, e
348 );
349 return Err(e);
350 }
351
352 debug!("Successfully created transaction {}", entity.id);
353 Ok(entity)
354 }
355
356 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
357 if id.is_empty() {
358 return Err(RepositoryError::InvalidData(
359 "Transaction ID cannot be empty".to_string(),
360 ));
361 }
362
363 let mut conn = self.client.as_ref().clone();
364
365 debug!("Fetching transaction with ID: {}", id);
366
367 let reverse_key = self.tx_to_relayer_key(&id);
368 let relayer_id: Option<String> = conn
369 .get(&reverse_key)
370 .await
371 .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
372
373 let relayer_id = match relayer_id {
374 Some(relayer_id) => relayer_id,
375 None => {
376 debug!("Transaction {} not found (no reverse lookup)", id);
377 return Err(RepositoryError::NotFound(format!(
378 "Transaction with ID {} not found",
379 id
380 )));
381 }
382 };
383
384 let key = self.tx_key(&relayer_id, &id);
385 let value: Option<String> = conn
386 .get(&key)
387 .await
388 .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
389
390 match value {
391 Some(json) => {
392 let tx =
393 self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
394 debug!("Successfully fetched transaction {}", id);
395 Ok(tx)
396 }
397 None => {
398 debug!("Transaction {} not found", id);
399 Err(RepositoryError::NotFound(format!(
400 "Transaction with ID {} not found",
401 id
402 )))
403 }
404 }
405 }
406
407 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
408 let mut conn = self.client.as_ref().clone();
409
410 debug!("Fetching all transaction IDs");
411
412 let relayer_list_key = self.relayer_list_key();
414 let relayer_ids: Vec<String> = conn
415 .smembers(&relayer_list_key)
416 .await
417 .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
418
419 debug!("Found {} relayers", relayer_ids.len());
420
421 let mut all_tx_ids = Vec::new();
423 for relayer_id in relayer_ids {
424 let pattern = format!(
425 "{}:{}:{}:{}:*",
426 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
427 );
428 let mut cursor = 0;
429 loop {
430 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
431 .cursor_arg(cursor)
432 .arg("MATCH")
433 .arg(&pattern)
434 .query_async(&mut conn)
435 .await
436 .map_err(|e| self.map_redis_error(e, "list_all_scan_keys"))?;
437
438 for key in keys {
440 if let Some(tx_id) = key.split(':').next_back() {
441 all_tx_ids.push(tx_id.to_string());
442 }
443 }
444
445 cursor = next_cursor;
446 if cursor == 0 {
447 break;
448 }
449 }
450 }
451
452 debug!("Found {} transaction IDs", all_tx_ids.len());
453
454 let transactions = self.get_transactions_by_ids(&all_tx_ids).await?;
455 Ok(transactions.results)
456 }
457
458 async fn list_paginated(
459 &self,
460 query: PaginationQuery,
461 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
462 if query.per_page == 0 {
463 return Err(RepositoryError::InvalidData(
464 "per_page must be greater than 0".to_string(),
465 ));
466 }
467
468 let mut conn = self.client.as_ref().clone();
469
470 debug!(
471 "Fetching paginated transactions (page: {}, per_page: {})",
472 query.page, query.per_page
473 );
474
475 let relayer_list_key = self.relayer_list_key();
477 let relayer_ids: Vec<String> = conn
478 .smembers(&relayer_list_key)
479 .await
480 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
481
482 let mut all_tx_ids = Vec::new();
484 for relayer_id in relayer_ids {
485 let pattern = format!(
486 "{}:{}:{}:{}:*",
487 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
488 );
489 let mut cursor = 0;
490 loop {
491 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
492 .cursor_arg(cursor)
493 .arg("MATCH")
494 .arg(&pattern)
495 .query_async(&mut conn)
496 .await
497 .map_err(|e| self.map_redis_error(e, "list_paginated_scan_keys"))?;
498
499 for key in keys {
501 if let Some(tx_id) = key.split(':').next_back() {
502 all_tx_ids.push(tx_id.to_string());
503 }
504 }
505
506 cursor = next_cursor;
507 if cursor == 0 {
508 break;
509 }
510 }
511 }
512
513 let total = all_tx_ids.len() as u64;
514 let start = ((query.page - 1) * query.per_page) as usize;
515 let end = (start + query.per_page as usize).min(all_tx_ids.len());
516
517 if start >= all_tx_ids.len() {
518 debug!(
519 "Page {} is beyond available data (total: {})",
520 query.page, total
521 );
522 return Ok(PaginatedResult {
523 items: vec![],
524 total,
525 page: query.page,
526 per_page: query.per_page,
527 });
528 }
529
530 let page_ids = &all_tx_ids[start..end];
531 let items = self.get_transactions_by_ids(page_ids).await?;
532
533 debug!(
534 "Successfully fetched {} transactions for page {}",
535 items.results.len(),
536 query.page
537 );
538
539 Ok(PaginatedResult {
540 items: items.results.clone(),
541 total,
542 page: query.page,
543 per_page: query.per_page,
544 })
545 }
546
547 async fn update(
548 &self,
549 id: String,
550 entity: TransactionRepoModel,
551 ) -> Result<TransactionRepoModel, RepositoryError> {
552 if id.is_empty() {
553 return Err(RepositoryError::InvalidData(
554 "Transaction ID cannot be empty".to_string(),
555 ));
556 }
557
558 debug!("Updating transaction with ID: {}", id);
559
560 let old_tx = self.get_by_id(id.clone()).await?;
562
563 let key = self.tx_key(&entity.relayer_id, &id);
564 let mut conn = self.client.as_ref().clone();
565
566 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
567
568 let _: () = conn
570 .set(&key, value)
571 .await
572 .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
573
574 self.update_indexes(&entity, Some(&old_tx)).await?;
576
577 debug!("Successfully updated transaction {}", id);
578 Ok(entity)
579 }
580
581 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
582 if id.is_empty() {
583 return Err(RepositoryError::InvalidData(
584 "Transaction ID cannot be empty".to_string(),
585 ));
586 }
587
588 debug!("Deleting transaction with ID: {}", id);
589
590 let tx = self.get_by_id(id.clone()).await?;
592
593 let key = self.tx_key(&tx.relayer_id, &id);
594 let reverse_key = self.tx_to_relayer_key(&id);
595 let mut conn = self.client.as_ref().clone();
596
597 let mut pipe = redis::pipe();
598 pipe.atomic();
599 pipe.del(&key);
600 pipe.del(&reverse_key);
601
602 pipe.exec_async(&mut conn)
603 .await
604 .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
605
606 if let Err(e) = self.remove_all_indexes(&tx).await {
608 error!(
609 "Failed to remove indexes for deleted transaction {}: {}",
610 id, e
611 );
612 }
613
614 debug!("Successfully deleted transaction {}", id);
615 Ok(())
616 }
617
618 async fn count(&self) -> Result<usize, RepositoryError> {
619 let mut conn = self.client.as_ref().clone();
620
621 debug!("Counting transactions");
622
623 let relayer_list_key = self.relayer_list_key();
625 let relayer_ids: Vec<String> = conn
626 .smembers(&relayer_list_key)
627 .await
628 .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
629
630 let mut total_count = 0;
632 for relayer_id in relayer_ids {
633 let pattern = format!(
634 "{}:{}:{}:{}:*",
635 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
636 );
637 let mut cursor = 0;
638 loop {
639 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
640 .cursor_arg(cursor)
641 .arg("MATCH")
642 .arg(&pattern)
643 .query_async(&mut conn)
644 .await
645 .map_err(|e| self.map_redis_error(e, "count_scan_keys"))?;
646
647 total_count += keys.len();
648
649 cursor = next_cursor;
650 if cursor == 0 {
651 break;
652 }
653 }
654 }
655
656 debug!("Transaction count: {}", total_count);
657 Ok(total_count)
658 }
659
660 async fn has_entries(&self) -> Result<bool, RepositoryError> {
661 let mut conn = self.client.as_ref().clone();
662 let relayer_list_key = self.relayer_list_key();
663
664 debug!("Checking if transaction entries exist");
665
666 let exists: bool = conn
667 .exists(&relayer_list_key)
668 .await
669 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
670
671 debug!("Transaction entries exist: {}", exists);
672 Ok(exists)
673 }
674
675 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
676 let mut conn = self.client.as_ref().clone();
677 let relayer_list_key = self.relayer_list_key();
678
679 debug!("Dropping all transaction entries");
680
681 let relayer_ids: Vec<String> = conn
683 .smembers(&relayer_list_key)
684 .await
685 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
686
687 if relayer_ids.is_empty() {
688 debug!("No transaction entries to drop");
689 return Ok(());
690 }
691
692 let mut pipe = redis::pipe();
694 pipe.atomic();
695
696 for relayer_id in &relayer_ids {
698 let pattern = format!(
700 "{}:{}:{}:{}:*",
701 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
702 );
703 let mut cursor = 0;
704 let mut tx_ids = Vec::new();
705
706 loop {
707 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
708 .cursor_arg(cursor)
709 .arg("MATCH")
710 .arg(&pattern)
711 .query_async(&mut conn)
712 .await
713 .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
714
715 for key in keys {
717 pipe.del(&key);
718 if let Some(tx_id) = key.split(':').next_back() {
719 tx_ids.push(tx_id.to_string());
720 }
721 }
722
723 cursor = next_cursor;
724 if cursor == 0 {
725 break;
726 }
727 }
728
729 for tx_id in tx_ids {
731 let reverse_key = self.tx_to_relayer_key(&tx_id);
732 pipe.del(&reverse_key);
733
734 for status in &[
736 TransactionStatus::Pending,
737 TransactionStatus::Sent,
738 TransactionStatus::Confirmed,
739 TransactionStatus::Failed,
740 TransactionStatus::Canceled,
741 ] {
742 let status_key = self.relayer_status_key(relayer_id, status);
743 pipe.srem(&status_key, &tx_id);
744 }
745 }
746 }
747
748 pipe.del(&relayer_list_key);
750
751 pipe.exec_async(&mut conn)
752 .await
753 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
754
755 debug!(
756 "Dropped all transaction entries for {} relayers",
757 relayer_ids.len()
758 );
759 Ok(())
760 }
761}
762
763#[async_trait]
764impl TransactionRepository for RedisTransactionRepository {
765 async fn find_by_relayer_id(
766 &self,
767 relayer_id: &str,
768 query: PaginationQuery,
769 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
770 let mut conn = self.client.as_ref().clone();
771
772 let pattern = format!(
774 "{}:{}:{}:{}:*",
775 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
776 );
777 let mut all_tx_ids = Vec::new();
778 let mut cursor = 0;
779
780 loop {
781 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
782 .cursor_arg(cursor)
783 .arg("MATCH")
784 .arg(&pattern)
785 .query_async(&mut conn)
786 .await
787 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_scan"))?;
788
789 for key in keys {
791 if let Some(tx_id) = key.split(':').next_back() {
792 all_tx_ids.push(tx_id.to_string());
793 }
794 }
795
796 cursor = next_cursor;
797 if cursor == 0 {
798 break;
799 }
800 }
801
802 let total = all_tx_ids.len() as u64;
803 let start = ((query.page - 1) * query.per_page) as usize;
804 let end = (start + query.per_page as usize).min(all_tx_ids.len());
805
806 let page_ids = &all_tx_ids[start..end];
807 let items = self.get_transactions_by_ids(page_ids).await?;
808
809 Ok(PaginatedResult {
810 items: items.results.clone(),
811 total,
812 page: query.page,
813 per_page: query.per_page,
814 })
815 }
816
817 async fn find_by_status(
818 &self,
819 relayer_id: &str,
820 statuses: &[TransactionStatus],
821 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
822 let mut conn = self.client.as_ref().clone();
823 let mut all_ids = Vec::new();
824
825 for status in statuses {
827 let status_key = self.relayer_status_key(relayer_id, status);
828 let ids: Vec<String> = conn
829 .smembers(status_key)
830 .await
831 .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
832
833 all_ids.extend(ids);
834 }
835
836 all_ids.sort();
838 all_ids.dedup();
839
840 let transactions = self.get_transactions_by_ids(&all_ids).await?;
841 Ok(transactions.results)
842 }
843
844 async fn find_by_nonce(
845 &self,
846 relayer_id: &str,
847 nonce: u64,
848 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
849 let mut conn = self.client.as_ref().clone();
850 let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
851
852 let tx_id: Option<String> = conn
854 .get(nonce_key)
855 .await
856 .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
857
858 match tx_id {
859 Some(tx_id) => {
860 match self.get_by_id(tx_id.clone()).await {
861 Ok(tx) => Ok(Some(tx)),
862 Err(RepositoryError::NotFound(_)) => {
863 warn!(
865 "Stale nonce index found for relayer {} nonce {}",
866 relayer_id, nonce
867 );
868 Ok(None)
869 }
870 Err(e) => Err(e),
871 }
872 }
873 None => Ok(None),
874 }
875 }
876
877 async fn update_status(
878 &self,
879 tx_id: String,
880 status: TransactionStatus,
881 ) -> Result<TransactionRepoModel, RepositoryError> {
882 let update = TransactionUpdateRequest {
883 status: Some(status),
884 ..Default::default()
885 };
886 self.partial_update(tx_id, update).await
887 }
888
889 async fn partial_update(
890 &self,
891 tx_id: String,
892 update: TransactionUpdateRequest,
893 ) -> Result<TransactionRepoModel, RepositoryError> {
894 let mut tx = self.get_by_id(tx_id.clone()).await?;
896 let old_tx = tx.clone(); tx.apply_partial_update(update);
900
901 let key = self.tx_key(&tx.relayer_id, &tx_id);
903 let mut conn = self.client.as_ref().clone();
904
905 let value = self.serialize_entity(&tx, |t| &t.id, "transaction")?;
906
907 let _: () = conn
908 .set(&key, value)
909 .await
910 .map_err(|e| self.map_redis_error(e, "partial_update"))?;
911
912 self.update_indexes(&tx, Some(&old_tx)).await?;
913 Ok(tx)
914 }
915
916 async fn update_network_data(
917 &self,
918 tx_id: String,
919 network_data: NetworkTransactionData,
920 ) -> Result<TransactionRepoModel, RepositoryError> {
921 let update = TransactionUpdateRequest {
922 network_data: Some(network_data),
923 ..Default::default()
924 };
925 self.partial_update(tx_id, update).await
926 }
927
928 async fn set_sent_at(
929 &self,
930 tx_id: String,
931 sent_at: String,
932 ) -> Result<TransactionRepoModel, RepositoryError> {
933 let update = TransactionUpdateRequest {
934 sent_at: Some(sent_at),
935 ..Default::default()
936 };
937 self.partial_update(tx_id, update).await
938 }
939
940 async fn set_confirmed_at(
941 &self,
942 tx_id: String,
943 confirmed_at: String,
944 ) -> Result<TransactionRepoModel, RepositoryError> {
945 let update = TransactionUpdateRequest {
946 confirmed_at: Some(confirmed_at),
947 ..Default::default()
948 };
949 self.partial_update(tx_id, update).await
950 }
951}
952
953#[cfg(test)]
954mod tests {
955 use super::*;
956 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
957 use alloy::primitives::U256;
958 use lazy_static::lazy_static;
959 use redis::Client;
960 use std::str::FromStr;
961 use tokio;
962 use uuid::Uuid;
963
964 use tokio::sync::Mutex;
965
966 lazy_static! {
968 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
969 }
970
971 fn create_test_transaction(id: &str) -> TransactionRepoModel {
973 TransactionRepoModel {
974 id: id.to_string(),
975 relayer_id: "relayer-1".to_string(),
976 status: TransactionStatus::Pending,
977 status_reason: None,
978 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
979 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
980 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
981 valid_until: None,
982 delete_at: None,
983 network_type: NetworkType::Evm,
984 priced_at: None,
985 hashes: vec![],
986 network_data: NetworkTransactionData::Evm(EvmTransactionData {
987 gas_price: Some(1000000000),
988 gas_limit: Some(21000),
989 nonce: Some(1),
990 value: U256::from_str("1000000000000000000").unwrap(),
991 data: Some("0x".to_string()),
992 from: "0xSender".to_string(),
993 to: Some("0xRecipient".to_string()),
994 chain_id: 1,
995 signature: None,
996 hash: Some(format!("0x{}", id)),
997 speed: Some(Speed::Fast),
998 max_fee_per_gas: None,
999 max_priority_fee_per_gas: None,
1000 raw: None,
1001 }),
1002 noop_count: None,
1003 is_canceled: Some(false),
1004 }
1005 }
1006
1007 fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
1008 let mut tx = create_test_transaction(id);
1009 tx.relayer_id = relayer_id.to_string();
1010 tx
1011 }
1012
1013 fn create_test_transaction_with_status(
1014 id: &str,
1015 relayer_id: &str,
1016 status: TransactionStatus,
1017 ) -> TransactionRepoModel {
1018 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1019 tx.status = status;
1020 tx
1021 }
1022
1023 fn create_test_transaction_with_nonce(
1024 id: &str,
1025 nonce: u64,
1026 relayer_id: &str,
1027 ) -> TransactionRepoModel {
1028 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1029 if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
1030 evm_data.nonce = Some(nonce);
1031 }
1032 tx
1033 }
1034
1035 async fn setup_test_repo() -> RedisTransactionRepository {
1036 let redis_url = std::env::var("REDIS_TEST_URL")
1038 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1039
1040 let client = Client::open(redis_url).expect("Failed to create Redis client");
1041 let connection_manager = ConnectionManager::new(client)
1042 .await
1043 .expect("Failed to create connection manager");
1044
1045 let random_id = Uuid::new_v4().to_string();
1046 let key_prefix = format!("test_prefix:{}", random_id);
1047
1048 RedisTransactionRepository::new(Arc::new(connection_manager), key_prefix)
1049 .expect("Failed to create RedisTransactionRepository")
1050 }
1051
1052 #[tokio::test]
1053 #[ignore = "Requires active Redis instance"]
1054 async fn test_new_repository_creation() {
1055 let repo = setup_test_repo().await;
1056 assert!(repo.key_prefix.contains("test_prefix"));
1057 }
1058
1059 #[tokio::test]
1060 #[ignore = "Requires active Redis instance"]
1061 async fn test_new_repository_empty_prefix_fails() {
1062 let redis_url = std::env::var("REDIS_TEST_URL")
1063 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1064 let client = Client::open(redis_url).expect("Failed to create Redis client");
1065 let connection_manager = ConnectionManager::new(client)
1066 .await
1067 .expect("Failed to create connection manager");
1068
1069 let result = RedisTransactionRepository::new(Arc::new(connection_manager), "".to_string());
1070 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1071 }
1072
1073 #[tokio::test]
1074 #[ignore = "Requires active Redis instance"]
1075 async fn test_key_generation() {
1076 let repo = setup_test_repo().await;
1077
1078 assert!(repo
1079 .tx_key("relayer-1", "test-id")
1080 .contains(":relayer:relayer-1:tx:test-id"));
1081 assert!(repo
1082 .tx_to_relayer_key("test-id")
1083 .contains(":relayer:tx_to_relayer:test-id"));
1084 assert!(repo.relayer_list_key().contains(":relayer_list"));
1085 assert!(repo
1086 .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1087 .contains(":relayer:relayer-1:status:Pending"));
1088 assert!(repo
1089 .relayer_nonce_key("relayer-1", 42)
1090 .contains(":relayer:relayer-1:nonce:42"));
1091 }
1092
1093 #[tokio::test]
1094 #[ignore = "Requires active Redis instance"]
1095 async fn test_serialize_deserialize_transaction() {
1096 let repo = setup_test_repo().await;
1097 let tx = create_test_transaction("test-1");
1098
1099 let serialized = repo
1100 .serialize_entity(&tx, |t| &t.id, "transaction")
1101 .expect("Serialization should succeed");
1102 let deserialized: TransactionRepoModel = repo
1103 .deserialize_entity(&serialized, "test-1", "transaction")
1104 .expect("Deserialization should succeed");
1105
1106 assert_eq!(tx.id, deserialized.id);
1107 assert_eq!(tx.relayer_id, deserialized.relayer_id);
1108 assert_eq!(tx.status, deserialized.status);
1109 }
1110
1111 #[tokio::test]
1112 #[ignore = "Requires active Redis instance"]
1113 async fn test_extract_nonce() {
1114 let repo = setup_test_repo().await;
1115 let random_id = Uuid::new_v4().to_string();
1116 let relayer_id = Uuid::new_v4().to_string();
1117 let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1118
1119 let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1120 assert_eq!(nonce, Some(42));
1121 }
1122
1123 #[tokio::test]
1124 #[ignore = "Requires active Redis instance"]
1125 async fn test_create_transaction() {
1126 let repo = setup_test_repo().await;
1127 let random_id = Uuid::new_v4().to_string();
1128 let tx = create_test_transaction(&random_id);
1129
1130 let result = repo.create(tx.clone()).await.unwrap();
1131 assert_eq!(result.id, tx.id);
1132 }
1133
1134 #[tokio::test]
1135 #[ignore = "Requires active Redis instance"]
1136 async fn test_get_transaction() {
1137 let repo = setup_test_repo().await;
1138 let random_id = Uuid::new_v4().to_string();
1139 let tx = create_test_transaction(&random_id);
1140
1141 repo.create(tx.clone()).await.unwrap();
1142 let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1143 assert_eq!(stored.id, tx.id);
1144 assert_eq!(stored.relayer_id, tx.relayer_id);
1145 }
1146
1147 #[tokio::test]
1148 #[ignore = "Requires active Redis instance"]
1149 async fn test_update_transaction() {
1150 let repo = setup_test_repo().await;
1151 let random_id = Uuid::new_v4().to_string();
1152 let mut tx = create_test_transaction(&random_id);
1153
1154 repo.create(tx.clone()).await.unwrap();
1155 tx.status = TransactionStatus::Confirmed;
1156
1157 let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1158 assert!(matches!(updated.status, TransactionStatus::Confirmed));
1159 }
1160
1161 #[tokio::test]
1162 #[ignore = "Requires active Redis instance"]
1163 async fn test_delete_transaction() {
1164 let repo = setup_test_repo().await;
1165 let random_id = Uuid::new_v4().to_string();
1166 let tx = create_test_transaction(&random_id);
1167
1168 repo.create(tx).await.unwrap();
1169 repo.delete_by_id(random_id.to_string()).await.unwrap();
1170
1171 let result = repo.get_by_id(random_id.to_string()).await;
1172 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1173 }
1174
1175 #[tokio::test]
1176 #[ignore = "Requires active Redis instance"]
1177 async fn test_list_all_transactions() {
1178 let repo = setup_test_repo().await;
1179 let random_id = Uuid::new_v4().to_string();
1180 let random_id2 = Uuid::new_v4().to_string();
1181
1182 let tx1 = create_test_transaction(&random_id);
1183 let tx2 = create_test_transaction(&random_id2);
1184
1185 repo.create(tx1).await.unwrap();
1186 repo.create(tx2).await.unwrap();
1187
1188 let transactions = repo.list_all().await.unwrap();
1189 assert!(transactions.len() >= 2);
1190 }
1191
1192 #[tokio::test]
1193 #[ignore = "Requires active Redis instance"]
1194 async fn test_count_transactions() {
1195 let repo = setup_test_repo().await;
1196 let random_id = Uuid::new_v4().to_string();
1197 let tx = create_test_transaction(&random_id);
1198
1199 let count = repo.count().await.unwrap();
1200 repo.create(tx).await.unwrap();
1201 assert!(repo.count().await.unwrap() > count);
1202 }
1203
1204 #[tokio::test]
1205 #[ignore = "Requires active Redis instance"]
1206 async fn test_get_nonexistent_transaction() {
1207 let repo = setup_test_repo().await;
1208 let result = repo.get_by_id("nonexistent".to_string()).await;
1209 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1210 }
1211
1212 #[tokio::test]
1213 #[ignore = "Requires active Redis instance"]
1214 async fn test_duplicate_transaction_creation() {
1215 let repo = setup_test_repo().await;
1216 let random_id = Uuid::new_v4().to_string();
1217
1218 let tx = create_test_transaction(&random_id);
1219
1220 repo.create(tx.clone()).await.unwrap();
1221 let result = repo.create(tx).await;
1222
1223 assert!(matches!(
1224 result,
1225 Err(RepositoryError::ConstraintViolation(_))
1226 ));
1227 }
1228
1229 #[tokio::test]
1230 #[ignore = "Requires active Redis instance"]
1231 async fn test_update_nonexistent_transaction() {
1232 let repo = setup_test_repo().await;
1233 let tx = create_test_transaction("test-1");
1234
1235 let result = repo.update("nonexistent".to_string(), tx).await;
1236 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1237 }
1238
1239 #[tokio::test]
1240 #[ignore = "Requires active Redis instance"]
1241 async fn test_list_paginated() {
1242 let repo = setup_test_repo().await;
1243
1244 for _ in 1..=10 {
1246 let random_id = Uuid::new_v4().to_string();
1247 let tx = create_test_transaction(&random_id);
1248 repo.create(tx).await.unwrap();
1249 }
1250
1251 let query = PaginationQuery {
1253 page: 1,
1254 per_page: 3,
1255 };
1256 let result = repo.list_paginated(query).await.unwrap();
1257 assert_eq!(result.items.len(), 3);
1258 assert!(result.total >= 10);
1259 assert_eq!(result.page, 1);
1260 assert_eq!(result.per_page, 3);
1261
1262 let query = PaginationQuery {
1264 page: 1000,
1265 per_page: 3,
1266 };
1267 let result = repo.list_paginated(query).await.unwrap();
1268 assert_eq!(result.items.len(), 0);
1269 }
1270
1271 #[tokio::test]
1272 #[ignore = "Requires active Redis instance"]
1273 async fn test_find_by_relayer_id() {
1274 let repo = setup_test_repo().await;
1275 let random_id = Uuid::new_v4().to_string();
1276 let random_id2 = Uuid::new_v4().to_string();
1277 let random_id3 = Uuid::new_v4().to_string();
1278
1279 let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
1280 let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
1281 let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
1282
1283 repo.create(tx1).await.unwrap();
1284 repo.create(tx2).await.unwrap();
1285 repo.create(tx3).await.unwrap();
1286
1287 let query = PaginationQuery {
1289 page: 1,
1290 per_page: 10,
1291 };
1292 let result = repo
1293 .find_by_relayer_id("relayer-1", query.clone())
1294 .await
1295 .unwrap();
1296 assert!(result.total >= 2);
1297 assert!(result.items.len() >= 2);
1298 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
1299
1300 let result = repo
1302 .find_by_relayer_id("relayer-2", query.clone())
1303 .await
1304 .unwrap();
1305 assert!(result.total >= 1);
1306 assert!(!result.items.is_empty());
1307 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
1308
1309 let result = repo
1311 .find_by_relayer_id("non-existent", query.clone())
1312 .await
1313 .unwrap();
1314 assert_eq!(result.total, 0);
1315 assert_eq!(result.items.len(), 0);
1316 }
1317
1318 #[tokio::test]
1319 #[ignore = "Requires active Redis instance"]
1320 async fn test_find_by_status() {
1321 let repo = setup_test_repo().await;
1322 let random_id = Uuid::new_v4().to_string();
1323 let random_id2 = Uuid::new_v4().to_string();
1324 let random_id3 = Uuid::new_v4().to_string();
1325 let relayer_id = Uuid::new_v4().to_string();
1326 let tx1 = create_test_transaction_with_status(
1327 &random_id,
1328 &relayer_id,
1329 TransactionStatus::Pending,
1330 );
1331 let tx2 =
1332 create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
1333 let tx3 = create_test_transaction_with_status(
1334 &random_id3,
1335 &relayer_id,
1336 TransactionStatus::Confirmed,
1337 );
1338
1339 repo.create(tx1).await.unwrap();
1340 repo.create(tx2).await.unwrap();
1341 repo.create(tx3).await.unwrap();
1342
1343 let result = repo
1345 .find_by_status(&relayer_id, &[TransactionStatus::Pending])
1346 .await
1347 .unwrap();
1348 assert_eq!(result.len(), 1);
1349 assert_eq!(result[0].status, TransactionStatus::Pending);
1350
1351 let result = repo
1353 .find_by_status(
1354 &relayer_id,
1355 &[TransactionStatus::Pending, TransactionStatus::Sent],
1356 )
1357 .await
1358 .unwrap();
1359 assert_eq!(result.len(), 2);
1360
1361 let result = repo
1363 .find_by_status(&relayer_id, &[TransactionStatus::Failed])
1364 .await
1365 .unwrap();
1366 assert_eq!(result.len(), 0);
1367 }
1368
1369 #[tokio::test]
1370 #[ignore = "Requires active Redis instance"]
1371 async fn test_find_by_nonce() {
1372 let repo = setup_test_repo().await;
1373 let random_id = Uuid::new_v4().to_string();
1374 let random_id2 = Uuid::new_v4().to_string();
1375 let relayer_id = Uuid::new_v4().to_string();
1376
1377 let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1378 let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
1379
1380 repo.create(tx1.clone()).await.unwrap();
1381 repo.create(tx2).await.unwrap();
1382
1383 let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1385 assert!(result.is_some());
1386 assert_eq!(result.unwrap().id, random_id);
1387
1388 let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
1390 assert!(result.is_none());
1391
1392 let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
1394 assert!(result.is_none());
1395 }
1396
1397 #[tokio::test]
1398 #[ignore = "Requires active Redis instance"]
1399 async fn test_update_status() {
1400 let repo = setup_test_repo().await;
1401 let random_id = Uuid::new_v4().to_string();
1402 let tx = create_test_transaction(&random_id);
1403
1404 repo.create(tx).await.unwrap();
1405 let updated = repo
1406 .update_status(random_id.to_string(), TransactionStatus::Confirmed)
1407 .await
1408 .unwrap();
1409 assert_eq!(updated.status, TransactionStatus::Confirmed);
1410 }
1411
1412 #[tokio::test]
1413 #[ignore = "Requires active Redis instance"]
1414 async fn test_partial_update() {
1415 let repo = setup_test_repo().await;
1416 let random_id = Uuid::new_v4().to_string();
1417 let tx = create_test_transaction(&random_id);
1418
1419 repo.create(tx).await.unwrap();
1420
1421 let update = TransactionUpdateRequest {
1422 status: Some(TransactionStatus::Sent),
1423 status_reason: Some("Transaction sent".to_string()),
1424 sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
1425 confirmed_at: None,
1426 network_data: None,
1427 hashes: None,
1428 is_canceled: None,
1429 priced_at: None,
1430 noop_count: None,
1431 delete_at: None,
1432 };
1433
1434 let updated = repo
1435 .partial_update(random_id.to_string(), update)
1436 .await
1437 .unwrap();
1438 assert_eq!(updated.status, TransactionStatus::Sent);
1439 assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
1440 assert_eq!(
1441 updated.sent_at,
1442 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1443 );
1444 }
1445
1446 #[tokio::test]
1447 #[ignore = "Requires active Redis instance"]
1448 async fn test_set_sent_at() {
1449 let repo = setup_test_repo().await;
1450 let random_id = Uuid::new_v4().to_string();
1451 let tx = create_test_transaction(&random_id);
1452
1453 repo.create(tx).await.unwrap();
1454 let updated = repo
1455 .set_sent_at(
1456 random_id.to_string(),
1457 "2025-01-27T16:00:00.000000+00:00".to_string(),
1458 )
1459 .await
1460 .unwrap();
1461 assert_eq!(
1462 updated.sent_at,
1463 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1464 );
1465 }
1466
1467 #[tokio::test]
1468 #[ignore = "Requires active Redis instance"]
1469 async fn test_set_confirmed_at() {
1470 let repo = setup_test_repo().await;
1471 let random_id = Uuid::new_v4().to_string();
1472 let tx = create_test_transaction(&random_id);
1473
1474 repo.create(tx).await.unwrap();
1475 let updated = repo
1476 .set_confirmed_at(
1477 random_id.to_string(),
1478 "2025-01-27T16:00:00.000000+00:00".to_string(),
1479 )
1480 .await
1481 .unwrap();
1482 assert_eq!(
1483 updated.confirmed_at,
1484 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1485 );
1486 }
1487
1488 #[tokio::test]
1489 #[ignore = "Requires active Redis instance"]
1490 async fn test_update_network_data() {
1491 let repo = setup_test_repo().await;
1492 let random_id = Uuid::new_v4().to_string();
1493 let tx = create_test_transaction(&random_id);
1494
1495 repo.create(tx).await.unwrap();
1496
1497 let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
1498 gas_price: Some(2000000000),
1499 gas_limit: Some(42000),
1500 nonce: Some(2),
1501 value: U256::from_str("2000000000000000000").unwrap(),
1502 data: Some("0x1234".to_string()),
1503 from: "0xNewSender".to_string(),
1504 to: Some("0xNewRecipient".to_string()),
1505 chain_id: 1,
1506 signature: None,
1507 hash: Some("0xnewhash".to_string()),
1508 speed: Some(Speed::SafeLow),
1509 max_fee_per_gas: None,
1510 max_priority_fee_per_gas: None,
1511 raw: None,
1512 });
1513
1514 let updated = repo
1515 .update_network_data(random_id.to_string(), new_network_data.clone())
1516 .await
1517 .unwrap();
1518 assert_eq!(
1519 updated
1520 .network_data
1521 .get_evm_transaction_data()
1522 .unwrap()
1523 .hash,
1524 new_network_data.get_evm_transaction_data().unwrap().hash
1525 );
1526 }
1527
1528 #[tokio::test]
1529 #[ignore = "Requires active Redis instance"]
1530 async fn test_debug_implementation() {
1531 let repo = setup_test_repo().await;
1532 let debug_str = format!("{:?}", repo);
1533 assert!(debug_str.contains("RedisTransactionRepository"));
1534 assert!(debug_str.contains("test_prefix"));
1535 }
1536
1537 #[tokio::test]
1538 #[ignore = "Requires active Redis instance"]
1539 async fn test_error_handling_empty_id() {
1540 let repo = setup_test_repo().await;
1541
1542 let result = repo.get_by_id("".to_string()).await;
1543 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1544
1545 let result = repo
1546 .update("".to_string(), create_test_transaction("test"))
1547 .await;
1548 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1549
1550 let result = repo.delete_by_id("".to_string()).await;
1551 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1552 }
1553
1554 #[tokio::test]
1555 #[ignore = "Requires active Redis instance"]
1556 async fn test_pagination_validation() {
1557 let repo = setup_test_repo().await;
1558
1559 let query = PaginationQuery {
1560 page: 1,
1561 per_page: 0,
1562 };
1563 let result = repo.list_paginated(query).await;
1564 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1565 }
1566
1567 #[tokio::test]
1568 #[ignore = "Requires active Redis instance"]
1569 async fn test_index_consistency() {
1570 let repo = setup_test_repo().await;
1571 let random_id = Uuid::new_v4().to_string();
1572 let relayer_id = Uuid::new_v4().to_string();
1573 let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1574
1575 repo.create(tx.clone()).await.unwrap();
1577
1578 let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1580 assert!(found.is_some());
1581
1582 let mut updated_tx = tx.clone();
1584 if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
1585 evm_data.nonce = Some(43);
1586 }
1587
1588 repo.update(random_id.to_string(), updated_tx)
1589 .await
1590 .unwrap();
1591
1592 let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1594 assert!(old_nonce_result.is_none());
1595
1596 let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
1598 assert!(new_nonce_result.is_some());
1599 }
1600
1601 #[tokio::test]
1602 #[ignore = "Requires active Redis instance"]
1603 async fn test_has_entries() {
1604 let repo = setup_test_repo().await;
1605 assert!(!repo.has_entries().await.unwrap());
1606
1607 let tx_id = uuid::Uuid::new_v4().to_string();
1608 let tx = create_test_transaction(&tx_id);
1609 repo.create(tx.clone()).await.unwrap();
1610
1611 assert!(repo.has_entries().await.unwrap());
1612 }
1613
1614 #[tokio::test]
1615 #[ignore = "Requires active Redis instance"]
1616 async fn test_drop_all_entries() {
1617 let repo = setup_test_repo().await;
1618 let tx_id = uuid::Uuid::new_v4().to_string();
1619 let tx = create_test_transaction(&tx_id);
1620 repo.create(tx.clone()).await.unwrap();
1621 assert!(repo.has_entries().await.unwrap());
1622
1623 repo.drop_all_entries().await.unwrap();
1624 assert!(!repo.has_entries().await.unwrap());
1625 }
1626
1627 #[tokio::test]
1629 #[ignore = "Requires active Redis instance"]
1630 async fn test_update_status_sets_delete_at_for_final_statuses() {
1631 let _lock = ENV_MUTEX.lock().await;
1632
1633 use chrono::{DateTime, Duration, Utc};
1634 use std::env;
1635
1636 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
1638
1639 let repo = setup_test_repo().await;
1640
1641 let final_statuses = [
1642 TransactionStatus::Canceled,
1643 TransactionStatus::Confirmed,
1644 TransactionStatus::Failed,
1645 TransactionStatus::Expired,
1646 ];
1647
1648 for (i, status) in final_statuses.iter().enumerate() {
1649 let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
1650 let mut tx = create_test_transaction(&tx_id);
1651
1652 tx.delete_at = None;
1654 tx.status = TransactionStatus::Pending;
1655
1656 repo.create(tx).await.unwrap();
1657
1658 let before_update = Utc::now();
1659
1660 let updated = repo
1662 .update_status(tx_id.clone(), status.clone())
1663 .await
1664 .unwrap();
1665
1666 assert!(
1668 updated.delete_at.is_some(),
1669 "delete_at should be set for status: {:?}",
1670 status
1671 );
1672
1673 let delete_at_str = updated.delete_at.unwrap();
1675 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1676 .expect("delete_at should be valid RFC3339")
1677 .with_timezone(&Utc);
1678
1679 let duration_from_before = delete_at.signed_duration_since(before_update);
1680 let expected_duration = Duration::hours(6);
1681 let tolerance = Duration::minutes(5);
1682
1683 assert!(
1684 duration_from_before >= expected_duration - tolerance &&
1685 duration_from_before <= expected_duration + tolerance,
1686 "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1687 status, duration_from_before
1688 );
1689 }
1690
1691 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1693 }
1694
1695 #[tokio::test]
1696 #[ignore = "Requires active Redis instance"]
1697 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1698 let _lock = ENV_MUTEX.lock().await;
1699
1700 use std::env;
1701
1702 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1703
1704 let repo = setup_test_repo().await;
1705
1706 let non_final_statuses = [
1707 TransactionStatus::Pending,
1708 TransactionStatus::Sent,
1709 TransactionStatus::Submitted,
1710 TransactionStatus::Mined,
1711 ];
1712
1713 for (i, status) in non_final_statuses.iter().enumerate() {
1714 let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
1715 let mut tx = create_test_transaction(&tx_id);
1716 tx.delete_at = None;
1717 tx.status = TransactionStatus::Pending;
1718
1719 repo.create(tx).await.unwrap();
1720
1721 let updated = repo
1723 .update_status(tx_id.clone(), status.clone())
1724 .await
1725 .unwrap();
1726
1727 assert!(
1729 updated.delete_at.is_none(),
1730 "delete_at should NOT be set for status: {:?}",
1731 status
1732 );
1733 }
1734
1735 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1737 }
1738
1739 #[tokio::test]
1740 #[ignore = "Requires active Redis instance"]
1741 async fn test_partial_update_sets_delete_at_for_final_statuses() {
1742 let _lock = ENV_MUTEX.lock().await;
1743
1744 use chrono::{DateTime, Duration, Utc};
1745 use std::env;
1746
1747 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1748
1749 let repo = setup_test_repo().await;
1750 let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
1751 let mut tx = create_test_transaction(&tx_id);
1752 tx.delete_at = None;
1753 tx.status = TransactionStatus::Pending;
1754
1755 repo.create(tx).await.unwrap();
1756
1757 let before_update = Utc::now();
1758
1759 let update = TransactionUpdateRequest {
1761 status: Some(TransactionStatus::Confirmed),
1762 status_reason: Some("Transaction completed".to_string()),
1763 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1764 ..Default::default()
1765 };
1766
1767 let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
1768
1769 assert!(
1771 updated.delete_at.is_some(),
1772 "delete_at should be set when updating to Confirmed status"
1773 );
1774
1775 let delete_at_str = updated.delete_at.unwrap();
1777 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1778 .expect("delete_at should be valid RFC3339")
1779 .with_timezone(&Utc);
1780
1781 let duration_from_before = delete_at.signed_duration_since(before_update);
1782 let expected_duration = Duration::hours(8);
1783 let tolerance = Duration::minutes(5);
1784
1785 assert!(
1786 duration_from_before >= expected_duration - tolerance
1787 && duration_from_before <= expected_duration + tolerance,
1788 "delete_at should be approximately 8 hours from now. Duration: {:?}",
1789 duration_from_before
1790 );
1791
1792 assert_eq!(updated.status, TransactionStatus::Confirmed);
1794 assert_eq!(
1795 updated.status_reason,
1796 Some("Transaction completed".to_string())
1797 );
1798 assert_eq!(
1799 updated.confirmed_at,
1800 Some("2023-01-01T12:05:00Z".to_string())
1801 );
1802
1803 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1805 }
1806
1807 #[tokio::test]
1808 #[ignore = "Requires active Redis instance"]
1809 async fn test_update_status_preserves_existing_delete_at() {
1810 let _lock = ENV_MUTEX.lock().await;
1811
1812 use std::env;
1813
1814 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1815
1816 let repo = setup_test_repo().await;
1817 let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
1818 let mut tx = create_test_transaction(&tx_id);
1819
1820 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1822 tx.delete_at = Some(existing_delete_at.clone());
1823 tx.status = TransactionStatus::Pending;
1824
1825 repo.create(tx).await.unwrap();
1826
1827 let updated = repo
1829 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1830 .await
1831 .unwrap();
1832
1833 assert_eq!(
1835 updated.delete_at,
1836 Some(existing_delete_at),
1837 "Existing delete_at should be preserved when updating to final status"
1838 );
1839
1840 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1842 }
1843 #[tokio::test]
1844 #[ignore = "Requires active Redis instance"]
1845 async fn test_partial_update_without_status_change_preserves_delete_at() {
1846 let _lock = ENV_MUTEX.lock().await;
1847
1848 use std::env;
1849
1850 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1851
1852 let repo = setup_test_repo().await;
1853 let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
1854 let mut tx = create_test_transaction(&tx_id);
1855 tx.delete_at = None;
1856 tx.status = TransactionStatus::Pending;
1857
1858 repo.create(tx).await.unwrap();
1859
1860 let updated1 = repo
1862 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1863 .await
1864 .unwrap();
1865
1866 assert!(updated1.delete_at.is_some());
1867 let original_delete_at = updated1.delete_at.clone();
1868
1869 let update = TransactionUpdateRequest {
1871 status: None, status_reason: Some("Updated reason".to_string()),
1873 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1874 ..Default::default()
1875 };
1876
1877 let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
1878
1879 assert_eq!(
1881 updated2.delete_at, original_delete_at,
1882 "delete_at should be preserved when status is not updated"
1883 );
1884
1885 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1888 assert_eq!(
1889 updated2.confirmed_at,
1890 Some("2023-01-01T12:10:00Z".to_string())
1891 );
1892
1893 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1895 }
1896}