openzeppelin_relayer/repositories/transaction/
transaction_redis.rs

1//! Redis-backed implementation of the TransactionRepository.
2
3use crate::models::{
4    NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
5    TransactionStatus, TransactionUpdateRequest,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{
9    BatchRetrievalResult, PaginatedResult, Repository, TransactionRepository,
10};
11use async_trait::async_trait;
12use log::{debug, error, warn};
13use redis::aio::ConnectionManager;
14use redis::AsyncCommands;
15use std::fmt;
16use std::sync::Arc;
17
18const RELAYER_PREFIX: &str = "relayer";
19const TX_PREFIX: &str = "tx";
20const STATUS_PREFIX: &str = "status";
21const NONCE_PREFIX: &str = "nonce";
22const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
23const RELAYER_LIST_KEY: &str = "relayer_list";
24
25#[derive(Clone)]
26pub struct RedisTransactionRepository {
27    pub client: Arc<ConnectionManager>,
28    pub key_prefix: String,
29}
30
31impl RedisRepository for RedisTransactionRepository {}
32
33impl RedisTransactionRepository {
34    pub fn new(
35        connection_manager: Arc<ConnectionManager>,
36        key_prefix: String,
37    ) -> Result<Self, RepositoryError> {
38        if key_prefix.is_empty() {
39            return Err(RepositoryError::InvalidData(
40                "Redis key prefix cannot be empty".to_string(),
41            ));
42        }
43
44        Ok(Self {
45            client: connection_manager,
46            key_prefix,
47        })
48    }
49
50    /// Generate key for transaction data: relayer:{relayer_id}:tx:{tx_id}
51    fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
52        format!(
53            "{}:{}:{}:{}:{}",
54            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
55        )
56    }
57
58    /// Generate key for reverse lookup: tx_to_relayer:{tx_id}
59    fn tx_to_relayer_key(&self, tx_id: &str) -> String {
60        format!(
61            "{}:{}:{}:{}",
62            self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
63        )
64    }
65
66    /// Generate key for relayer status index: relayer:{relayer_id}:status:{status}
67    fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
68        format!(
69            "{}:{}:{}:{}:{}",
70            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
71        )
72    }
73
74    /// Generate key for relayer nonce index: relayer:{relayer_id}:nonce:{nonce}
75    fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
76        format!(
77            "{}:{}:{}:{}:{}",
78            self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
79        )
80    }
81
82    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
83    fn relayer_list_key(&self) -> String {
84        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
85    }
86
87    /// Batch fetch transactions by IDs using reverse lookup
88    async fn get_transactions_by_ids(
89        &self,
90        ids: &[String],
91    ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
92        if ids.is_empty() {
93            debug!("No transaction IDs provided for batch fetch");
94            return Ok(BatchRetrievalResult {
95                results: vec![],
96                failed_ids: vec![],
97            });
98        }
99
100        let mut conn = self.client.as_ref().clone();
101
102        let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
103
104        debug!("Fetching relayer IDs for {} transactions", ids.len());
105
106        let relayer_ids: Vec<Option<String>> = conn
107            .mget(&reverse_keys)
108            .await
109            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
110
111        let mut tx_keys = Vec::new();
112        let mut valid_ids = Vec::new();
113        let mut failed_ids = Vec::new();
114        for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
115            match relayer_id {
116                Some(relayer_id) => {
117                    tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
118                    valid_ids.push(ids[i].clone());
119                }
120                None => {
121                    warn!("No relayer found for transaction {}", ids[i]);
122                    failed_ids.push(ids[i].clone());
123                }
124            }
125        }
126
127        if tx_keys.is_empty() {
128            debug!("No valid transactions found for batch fetch");
129            return Ok(BatchRetrievalResult {
130                results: vec![],
131                failed_ids,
132            });
133        }
134
135        debug!("Batch fetching {} transaction data", tx_keys.len());
136
137        let values: Vec<Option<String>> = conn
138            .mget(&tx_keys)
139            .await
140            .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
141
142        let mut transactions = Vec::new();
143        let mut failed_count = 0;
144        let mut failed_ids = Vec::new();
145        for (i, value) in values.into_iter().enumerate() {
146            match value {
147                Some(json) => {
148                    match self.deserialize_entity::<TransactionRepoModel>(
149                        &json,
150                        &valid_ids[i],
151                        "transaction",
152                    ) {
153                        Ok(tx) => transactions.push(tx),
154                        Err(e) => {
155                            failed_count += 1;
156                            error!("Failed to deserialize transaction {}: {}", valid_ids[i], e);
157                            // Continue processing other transactions
158                        }
159                    }
160                }
161                None => {
162                    warn!("Transaction {} not found in batch fetch", valid_ids[i]);
163                    failed_ids.push(valid_ids[i].clone());
164                }
165            }
166        }
167
168        if failed_count > 0 {
169            warn!(
170                "Failed to deserialize {} out of {} transactions in batch",
171                failed_count,
172                valid_ids.len()
173            );
174        }
175
176        debug!("Successfully fetched {} transactions", transactions.len());
177        Ok(BatchRetrievalResult {
178            results: transactions,
179            failed_ids,
180        })
181    }
182
183    /// Extract nonce from EVM transaction data
184    fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
185        match network_data.get_evm_transaction_data() {
186            Ok(tx_data) => tx_data.nonce,
187            Err(_) => {
188                debug!("No EVM transaction data available for nonce extraction");
189                None
190            }
191        }
192    }
193
194    /// Update indexes atomically with comprehensive error handling
195    async fn update_indexes(
196        &self,
197        tx: &TransactionRepoModel,
198        old_tx: Option<&TransactionRepoModel>,
199    ) -> Result<(), RepositoryError> {
200        let mut conn = self.client.as_ref().clone();
201        let mut pipe = redis::pipe();
202        pipe.atomic();
203
204        debug!("Updating indexes for transaction {}", tx.id);
205
206        // Add relayer to the global relayer list
207        let relayer_list_key = self.relayer_list_key();
208        pipe.sadd(&relayer_list_key, &tx.relayer_id);
209
210        // Handle status index updates
211        let new_status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
212        pipe.sadd(&new_status_key, &tx.id);
213
214        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
215            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
216            pipe.set(&nonce_key, &tx.id);
217            debug!("Added nonce index for tx {} with nonce {}", tx.id, nonce);
218        }
219
220        // Remove old indexes if updating
221        if let Some(old) = old_tx {
222            if old.status != tx.status {
223                let old_status_key = self.relayer_status_key(&old.relayer_id, &old.status);
224                pipe.srem(&old_status_key, &tx.id);
225                debug!(
226                    "Removing old status index for tx {} (status: {} -> {})",
227                    tx.id, old.status, tx.status
228                );
229            }
230
231            // Handle nonce index cleanup
232            if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
233                let new_nonce = self.extract_nonce(&tx.network_data);
234                if Some(old_nonce) != new_nonce {
235                    let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
236                    pipe.del(&old_nonce_key);
237                    debug!(
238                        "Removing old nonce index for tx {} (nonce: {} -> {:?})",
239                        tx.id, old_nonce, new_nonce
240                    );
241                }
242            }
243        }
244
245        // Execute all operations in a single pipeline
246        pipe.exec_async(&mut conn).await.map_err(|e| {
247            error!(
248                "Index update pipeline failed for transaction {}: {}",
249                tx.id, e
250            );
251            self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
252        })?;
253
254        debug!("Successfully updated indexes for transaction {}", tx.id);
255        Ok(())
256    }
257
258    /// Remove all indexes with error recovery
259    async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
260        let mut conn = self.client.as_ref().clone();
261        let mut pipe = redis::pipe();
262        pipe.atomic();
263
264        debug!("Removing all indexes for transaction {}", tx.id);
265
266        // Remove from status index
267        let status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
268        pipe.srem(&status_key, &tx.id);
269
270        // Remove nonce index if exists
271        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
272            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
273            pipe.del(&nonce_key);
274            debug!("Removing nonce index for tx {} with nonce {}", tx.id, nonce);
275        }
276
277        // Remove reverse lookup
278        let reverse_key = self.tx_to_relayer_key(&tx.id);
279        pipe.del(&reverse_key);
280
281        pipe.exec_async(&mut conn).await.map_err(|e| {
282            error!("Index removal failed for transaction {}: {}", tx.id, e);
283            self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
284        })?;
285
286        debug!("Successfully removed all indexes for transaction {}", tx.id);
287        Ok(())
288    }
289}
290
291impl fmt::Debug for RedisTransactionRepository {
292    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293        f.debug_struct("RedisTransactionRepository")
294            .field("client", &"<ConnectionManager>")
295            .field("key_prefix", &self.key_prefix)
296            .finish()
297    }
298}
299
300#[async_trait]
301impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
302    async fn create(
303        &self,
304        entity: TransactionRepoModel,
305    ) -> Result<TransactionRepoModel, RepositoryError> {
306        if entity.id.is_empty() {
307            return Err(RepositoryError::InvalidData(
308                "Transaction ID cannot be empty".to_string(),
309            ));
310        }
311
312        let key = self.tx_key(&entity.relayer_id, &entity.id);
313        let reverse_key = self.tx_to_relayer_key(&entity.id);
314        let mut conn = self.client.as_ref().clone();
315
316        debug!("Creating transaction with ID: {}", entity.id);
317
318        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
319
320        // Check if transaction already exists by checking reverse lookup
321        let existing: Option<String> = conn
322            .get(&reverse_key)
323            .await
324            .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
325
326        if existing.is_some() {
327            return Err(RepositoryError::ConstraintViolation(format!(
328                "Transaction with ID {} already exists",
329                entity.id
330            )));
331        }
332
333        // Use atomic pipeline for consistency
334        let mut pipe = redis::pipe();
335        pipe.atomic();
336        pipe.set(&key, &value);
337        pipe.set(&reverse_key, &entity.relayer_id);
338
339        pipe.exec_async(&mut conn)
340            .await
341            .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
342
343        // Update indexes separately to handle partial failures gracefully
344        if let Err(e) = self.update_indexes(&entity, None).await {
345            error!(
346                "Failed to update indexes for new transaction {}: {}",
347                entity.id, e
348            );
349            return Err(e);
350        }
351
352        debug!("Successfully created transaction {}", entity.id);
353        Ok(entity)
354    }
355
356    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
357        if id.is_empty() {
358            return Err(RepositoryError::InvalidData(
359                "Transaction ID cannot be empty".to_string(),
360            ));
361        }
362
363        let mut conn = self.client.as_ref().clone();
364
365        debug!("Fetching transaction with ID: {}", id);
366
367        let reverse_key = self.tx_to_relayer_key(&id);
368        let relayer_id: Option<String> = conn
369            .get(&reverse_key)
370            .await
371            .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
372
373        let relayer_id = match relayer_id {
374            Some(relayer_id) => relayer_id,
375            None => {
376                debug!("Transaction {} not found (no reverse lookup)", id);
377                return Err(RepositoryError::NotFound(format!(
378                    "Transaction with ID {} not found",
379                    id
380                )));
381            }
382        };
383
384        let key = self.tx_key(&relayer_id, &id);
385        let value: Option<String> = conn
386            .get(&key)
387            .await
388            .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
389
390        match value {
391            Some(json) => {
392                let tx =
393                    self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
394                debug!("Successfully fetched transaction {}", id);
395                Ok(tx)
396            }
397            None => {
398                debug!("Transaction {} not found", id);
399                Err(RepositoryError::NotFound(format!(
400                    "Transaction with ID {} not found",
401                    id
402                )))
403            }
404        }
405    }
406
407    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
408        let mut conn = self.client.as_ref().clone();
409
410        debug!("Fetching all transaction IDs");
411
412        // Get all relayer IDs
413        let relayer_list_key = self.relayer_list_key();
414        let relayer_ids: Vec<String> = conn
415            .smembers(&relayer_list_key)
416            .await
417            .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
418
419        debug!("Found {} relayers", relayer_ids.len());
420
421        // Collect all transaction IDs from all relayers
422        let mut all_tx_ids = Vec::new();
423        for relayer_id in relayer_ids {
424            let pattern = format!(
425                "{}:{}:{}:{}:*",
426                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
427            );
428            let mut cursor = 0;
429            loop {
430                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
431                    .cursor_arg(cursor)
432                    .arg("MATCH")
433                    .arg(&pattern)
434                    .query_async(&mut conn)
435                    .await
436                    .map_err(|e| self.map_redis_error(e, "list_all_scan_keys"))?;
437
438                // Extract transaction IDs from keys
439                for key in keys {
440                    if let Some(tx_id) = key.split(':').next_back() {
441                        all_tx_ids.push(tx_id.to_string());
442                    }
443                }
444
445                cursor = next_cursor;
446                if cursor == 0 {
447                    break;
448                }
449            }
450        }
451
452        debug!("Found {} transaction IDs", all_tx_ids.len());
453
454        let transactions = self.get_transactions_by_ids(&all_tx_ids).await?;
455        Ok(transactions.results)
456    }
457
458    async fn list_paginated(
459        &self,
460        query: PaginationQuery,
461    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
462        if query.per_page == 0 {
463            return Err(RepositoryError::InvalidData(
464                "per_page must be greater than 0".to_string(),
465            ));
466        }
467
468        let mut conn = self.client.as_ref().clone();
469
470        debug!(
471            "Fetching paginated transactions (page: {}, per_page: {})",
472            query.page, query.per_page
473        );
474
475        // Get all relayer IDs
476        let relayer_list_key = self.relayer_list_key();
477        let relayer_ids: Vec<String> = conn
478            .smembers(&relayer_list_key)
479            .await
480            .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
481
482        // Collect all transaction IDs from all relayers
483        let mut all_tx_ids = Vec::new();
484        for relayer_id in relayer_ids {
485            let pattern = format!(
486                "{}:{}:{}:{}:*",
487                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
488            );
489            let mut cursor = 0;
490            loop {
491                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
492                    .cursor_arg(cursor)
493                    .arg("MATCH")
494                    .arg(&pattern)
495                    .query_async(&mut conn)
496                    .await
497                    .map_err(|e| self.map_redis_error(e, "list_paginated_scan_keys"))?;
498
499                // Extract transaction IDs from keys
500                for key in keys {
501                    if let Some(tx_id) = key.split(':').next_back() {
502                        all_tx_ids.push(tx_id.to_string());
503                    }
504                }
505
506                cursor = next_cursor;
507                if cursor == 0 {
508                    break;
509                }
510            }
511        }
512
513        let total = all_tx_ids.len() as u64;
514        let start = ((query.page - 1) * query.per_page) as usize;
515        let end = (start + query.per_page as usize).min(all_tx_ids.len());
516
517        if start >= all_tx_ids.len() {
518            debug!(
519                "Page {} is beyond available data (total: {})",
520                query.page, total
521            );
522            return Ok(PaginatedResult {
523                items: vec![],
524                total,
525                page: query.page,
526                per_page: query.per_page,
527            });
528        }
529
530        let page_ids = &all_tx_ids[start..end];
531        let items = self.get_transactions_by_ids(page_ids).await?;
532
533        debug!(
534            "Successfully fetched {} transactions for page {}",
535            items.results.len(),
536            query.page
537        );
538
539        Ok(PaginatedResult {
540            items: items.results.clone(),
541            total,
542            page: query.page,
543            per_page: query.per_page,
544        })
545    }
546
547    async fn update(
548        &self,
549        id: String,
550        entity: TransactionRepoModel,
551    ) -> Result<TransactionRepoModel, RepositoryError> {
552        if id.is_empty() {
553            return Err(RepositoryError::InvalidData(
554                "Transaction ID cannot be empty".to_string(),
555            ));
556        }
557
558        debug!("Updating transaction with ID: {}", id);
559
560        // Get the old transaction for index cleanup
561        let old_tx = self.get_by_id(id.clone()).await?;
562
563        let key = self.tx_key(&entity.relayer_id, &id);
564        let mut conn = self.client.as_ref().clone();
565
566        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
567
568        // Update transaction
569        let _: () = conn
570            .set(&key, value)
571            .await
572            .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
573
574        // Update indexes
575        self.update_indexes(&entity, Some(&old_tx)).await?;
576
577        debug!("Successfully updated transaction {}", id);
578        Ok(entity)
579    }
580
581    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
582        if id.is_empty() {
583            return Err(RepositoryError::InvalidData(
584                "Transaction ID cannot be empty".to_string(),
585            ));
586        }
587
588        debug!("Deleting transaction with ID: {}", id);
589
590        // Get transaction first for index cleanup
591        let tx = self.get_by_id(id.clone()).await?;
592
593        let key = self.tx_key(&tx.relayer_id, &id);
594        let reverse_key = self.tx_to_relayer_key(&id);
595        let mut conn = self.client.as_ref().clone();
596
597        let mut pipe = redis::pipe();
598        pipe.atomic();
599        pipe.del(&key);
600        pipe.del(&reverse_key);
601
602        pipe.exec_async(&mut conn)
603            .await
604            .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
605
606        // Remove indexes (log errors but don't fail the delete)
607        if let Err(e) = self.remove_all_indexes(&tx).await {
608            error!(
609                "Failed to remove indexes for deleted transaction {}: {}",
610                id, e
611            );
612        }
613
614        debug!("Successfully deleted transaction {}", id);
615        Ok(())
616    }
617
618    async fn count(&self) -> Result<usize, RepositoryError> {
619        let mut conn = self.client.as_ref().clone();
620
621        debug!("Counting transactions");
622
623        // Get all relayer IDs
624        let relayer_list_key = self.relayer_list_key();
625        let relayer_ids: Vec<String> = conn
626            .smembers(&relayer_list_key)
627            .await
628            .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
629
630        // Count transactions across all relayers
631        let mut total_count = 0;
632        for relayer_id in relayer_ids {
633            let pattern = format!(
634                "{}:{}:{}:{}:*",
635                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
636            );
637            let mut cursor = 0;
638            loop {
639                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
640                    .cursor_arg(cursor)
641                    .arg("MATCH")
642                    .arg(&pattern)
643                    .query_async(&mut conn)
644                    .await
645                    .map_err(|e| self.map_redis_error(e, "count_scan_keys"))?;
646
647                total_count += keys.len();
648
649                cursor = next_cursor;
650                if cursor == 0 {
651                    break;
652                }
653            }
654        }
655
656        debug!("Transaction count: {}", total_count);
657        Ok(total_count)
658    }
659
660    async fn has_entries(&self) -> Result<bool, RepositoryError> {
661        let mut conn = self.client.as_ref().clone();
662        let relayer_list_key = self.relayer_list_key();
663
664        debug!("Checking if transaction entries exist");
665
666        let exists: bool = conn
667            .exists(&relayer_list_key)
668            .await
669            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
670
671        debug!("Transaction entries exist: {}", exists);
672        Ok(exists)
673    }
674
675    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
676        let mut conn = self.client.as_ref().clone();
677        let relayer_list_key = self.relayer_list_key();
678
679        debug!("Dropping all transaction entries");
680
681        // Get all relayer IDs first
682        let relayer_ids: Vec<String> = conn
683            .smembers(&relayer_list_key)
684            .await
685            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
686
687        if relayer_ids.is_empty() {
688            debug!("No transaction entries to drop");
689            return Ok(());
690        }
691
692        // Use pipeline for atomic operations
693        let mut pipe = redis::pipe();
694        pipe.atomic();
695
696        // Delete all transactions and their indexes for each relayer
697        for relayer_id in &relayer_ids {
698            // Get all transaction IDs for this relayer
699            let pattern = format!(
700                "{}:{}:{}:{}:*",
701                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
702            );
703            let mut cursor = 0;
704            let mut tx_ids = Vec::new();
705
706            loop {
707                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
708                    .cursor_arg(cursor)
709                    .arg("MATCH")
710                    .arg(&pattern)
711                    .query_async(&mut conn)
712                    .await
713                    .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
714
715                // Extract transaction IDs from keys and delete keys
716                for key in keys {
717                    pipe.del(&key);
718                    if let Some(tx_id) = key.split(':').next_back() {
719                        tx_ids.push(tx_id.to_string());
720                    }
721                }
722
723                cursor = next_cursor;
724                if cursor == 0 {
725                    break;
726                }
727            }
728
729            // Delete reverse lookup keys and indexes
730            for tx_id in tx_ids {
731                let reverse_key = self.tx_to_relayer_key(&tx_id);
732                pipe.del(&reverse_key);
733
734                // Delete status indexes (we can't know the specific status, so we'll clean up known ones)
735                for status in &[
736                    TransactionStatus::Pending,
737                    TransactionStatus::Sent,
738                    TransactionStatus::Confirmed,
739                    TransactionStatus::Failed,
740                    TransactionStatus::Canceled,
741                ] {
742                    let status_key = self.relayer_status_key(relayer_id, status);
743                    pipe.srem(&status_key, &tx_id);
744                }
745            }
746        }
747
748        // Delete the relayer list key
749        pipe.del(&relayer_list_key);
750
751        pipe.exec_async(&mut conn)
752            .await
753            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
754
755        debug!(
756            "Dropped all transaction entries for {} relayers",
757            relayer_ids.len()
758        );
759        Ok(())
760    }
761}
762
763#[async_trait]
764impl TransactionRepository for RedisTransactionRepository {
765    async fn find_by_relayer_id(
766        &self,
767        relayer_id: &str,
768        query: PaginationQuery,
769    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
770        let mut conn = self.client.as_ref().clone();
771
772        // Scan for all transaction keys for this relayer
773        let pattern = format!(
774            "{}:{}:{}:{}:*",
775            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
776        );
777        let mut all_tx_ids = Vec::new();
778        let mut cursor = 0;
779
780        loop {
781            let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
782                .cursor_arg(cursor)
783                .arg("MATCH")
784                .arg(&pattern)
785                .query_async(&mut conn)
786                .await
787                .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_scan"))?;
788
789            // Extract transaction IDs from keys
790            for key in keys {
791                if let Some(tx_id) = key.split(':').next_back() {
792                    all_tx_ids.push(tx_id.to_string());
793                }
794            }
795
796            cursor = next_cursor;
797            if cursor == 0 {
798                break;
799            }
800        }
801
802        let total = all_tx_ids.len() as u64;
803        let start = ((query.page - 1) * query.per_page) as usize;
804        let end = (start + query.per_page as usize).min(all_tx_ids.len());
805
806        let page_ids = &all_tx_ids[start..end];
807        let items = self.get_transactions_by_ids(page_ids).await?;
808
809        Ok(PaginatedResult {
810            items: items.results.clone(),
811            total,
812            page: query.page,
813            per_page: query.per_page,
814        })
815    }
816
817    async fn find_by_status(
818        &self,
819        relayer_id: &str,
820        statuses: &[TransactionStatus],
821    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
822        let mut conn = self.client.as_ref().clone();
823        let mut all_ids = Vec::new();
824
825        // Collect IDs from all status sets
826        for status in statuses {
827            let status_key = self.relayer_status_key(relayer_id, status);
828            let ids: Vec<String> = conn
829                .smembers(status_key)
830                .await
831                .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
832
833            all_ids.extend(ids);
834        }
835
836        // Remove duplicates and batch fetch
837        all_ids.sort();
838        all_ids.dedup();
839
840        let transactions = self.get_transactions_by_ids(&all_ids).await?;
841        Ok(transactions.results)
842    }
843
844    async fn find_by_nonce(
845        &self,
846        relayer_id: &str,
847        nonce: u64,
848    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
849        let mut conn = self.client.as_ref().clone();
850        let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
851
852        // Get transaction ID with this nonce for this relayer (should be single value)
853        let tx_id: Option<String> = conn
854            .get(nonce_key)
855            .await
856            .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
857
858        match tx_id {
859            Some(tx_id) => {
860                match self.get_by_id(tx_id.clone()).await {
861                    Ok(tx) => Ok(Some(tx)),
862                    Err(RepositoryError::NotFound(_)) => {
863                        // Transaction was deleted but index wasn't cleaned up
864                        warn!(
865                            "Stale nonce index found for relayer {} nonce {}",
866                            relayer_id, nonce
867                        );
868                        Ok(None)
869                    }
870                    Err(e) => Err(e),
871                }
872            }
873            None => Ok(None),
874        }
875    }
876
877    async fn update_status(
878        &self,
879        tx_id: String,
880        status: TransactionStatus,
881    ) -> Result<TransactionRepoModel, RepositoryError> {
882        let update = TransactionUpdateRequest {
883            status: Some(status),
884            ..Default::default()
885        };
886        self.partial_update(tx_id, update).await
887    }
888
889    async fn partial_update(
890        &self,
891        tx_id: String,
892        update: TransactionUpdateRequest,
893    ) -> Result<TransactionRepoModel, RepositoryError> {
894        // Get current transaction
895        let mut tx = self.get_by_id(tx_id.clone()).await?;
896        let old_tx = tx.clone(); // Keep copy for index updates
897
898        // Apply partial updates using the model's business logic
899        tx.apply_partial_update(update);
900
901        // Update transaction and indexes atomically
902        let key = self.tx_key(&tx.relayer_id, &tx_id);
903        let mut conn = self.client.as_ref().clone();
904
905        let value = self.serialize_entity(&tx, |t| &t.id, "transaction")?;
906
907        let _: () = conn
908            .set(&key, value)
909            .await
910            .map_err(|e| self.map_redis_error(e, "partial_update"))?;
911
912        self.update_indexes(&tx, Some(&old_tx)).await?;
913        Ok(tx)
914    }
915
916    async fn update_network_data(
917        &self,
918        tx_id: String,
919        network_data: NetworkTransactionData,
920    ) -> Result<TransactionRepoModel, RepositoryError> {
921        let update = TransactionUpdateRequest {
922            network_data: Some(network_data),
923            ..Default::default()
924        };
925        self.partial_update(tx_id, update).await
926    }
927
928    async fn set_sent_at(
929        &self,
930        tx_id: String,
931        sent_at: String,
932    ) -> Result<TransactionRepoModel, RepositoryError> {
933        let update = TransactionUpdateRequest {
934            sent_at: Some(sent_at),
935            ..Default::default()
936        };
937        self.partial_update(tx_id, update).await
938    }
939
940    async fn set_confirmed_at(
941        &self,
942        tx_id: String,
943        confirmed_at: String,
944    ) -> Result<TransactionRepoModel, RepositoryError> {
945        let update = TransactionUpdateRequest {
946            confirmed_at: Some(confirmed_at),
947            ..Default::default()
948        };
949        self.partial_update(tx_id, update).await
950    }
951}
952
953#[cfg(test)]
954mod tests {
955    use super::*;
956    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
957    use alloy::primitives::U256;
958    use lazy_static::lazy_static;
959    use redis::Client;
960    use std::str::FromStr;
961    use tokio;
962    use uuid::Uuid;
963
964    use tokio::sync::Mutex;
965
966    // Use a mutex to ensure tests don't run in parallel when modifying env vars
967    lazy_static! {
968        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
969    }
970
971    // Helper function to create test transactions
972    fn create_test_transaction(id: &str) -> TransactionRepoModel {
973        TransactionRepoModel {
974            id: id.to_string(),
975            relayer_id: "relayer-1".to_string(),
976            status: TransactionStatus::Pending,
977            status_reason: None,
978            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
979            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
980            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
981            valid_until: None,
982            delete_at: None,
983            network_type: NetworkType::Evm,
984            priced_at: None,
985            hashes: vec![],
986            network_data: NetworkTransactionData::Evm(EvmTransactionData {
987                gas_price: Some(1000000000),
988                gas_limit: Some(21000),
989                nonce: Some(1),
990                value: U256::from_str("1000000000000000000").unwrap(),
991                data: Some("0x".to_string()),
992                from: "0xSender".to_string(),
993                to: Some("0xRecipient".to_string()),
994                chain_id: 1,
995                signature: None,
996                hash: Some(format!("0x{}", id)),
997                speed: Some(Speed::Fast),
998                max_fee_per_gas: None,
999                max_priority_fee_per_gas: None,
1000                raw: None,
1001            }),
1002            noop_count: None,
1003            is_canceled: Some(false),
1004        }
1005    }
1006
1007    fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
1008        let mut tx = create_test_transaction(id);
1009        tx.relayer_id = relayer_id.to_string();
1010        tx
1011    }
1012
1013    fn create_test_transaction_with_status(
1014        id: &str,
1015        relayer_id: &str,
1016        status: TransactionStatus,
1017    ) -> TransactionRepoModel {
1018        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1019        tx.status = status;
1020        tx
1021    }
1022
1023    fn create_test_transaction_with_nonce(
1024        id: &str,
1025        nonce: u64,
1026        relayer_id: &str,
1027    ) -> TransactionRepoModel {
1028        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1029        if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
1030            evm_data.nonce = Some(nonce);
1031        }
1032        tx
1033    }
1034
1035    async fn setup_test_repo() -> RedisTransactionRepository {
1036        // Use a mock Redis URL - in real integration tests, this would connect to a test Redis instance
1037        let redis_url = std::env::var("REDIS_TEST_URL")
1038            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1039
1040        let client = Client::open(redis_url).expect("Failed to create Redis client");
1041        let connection_manager = ConnectionManager::new(client)
1042            .await
1043            .expect("Failed to create connection manager");
1044
1045        let random_id = Uuid::new_v4().to_string();
1046        let key_prefix = format!("test_prefix:{}", random_id);
1047
1048        RedisTransactionRepository::new(Arc::new(connection_manager), key_prefix)
1049            .expect("Failed to create RedisTransactionRepository")
1050    }
1051
1052    #[tokio::test]
1053    #[ignore = "Requires active Redis instance"]
1054    async fn test_new_repository_creation() {
1055        let repo = setup_test_repo().await;
1056        assert!(repo.key_prefix.contains("test_prefix"));
1057    }
1058
1059    #[tokio::test]
1060    #[ignore = "Requires active Redis instance"]
1061    async fn test_new_repository_empty_prefix_fails() {
1062        let redis_url = std::env::var("REDIS_TEST_URL")
1063            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1064        let client = Client::open(redis_url).expect("Failed to create Redis client");
1065        let connection_manager = ConnectionManager::new(client)
1066            .await
1067            .expect("Failed to create connection manager");
1068
1069        let result = RedisTransactionRepository::new(Arc::new(connection_manager), "".to_string());
1070        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1071    }
1072
1073    #[tokio::test]
1074    #[ignore = "Requires active Redis instance"]
1075    async fn test_key_generation() {
1076        let repo = setup_test_repo().await;
1077
1078        assert!(repo
1079            .tx_key("relayer-1", "test-id")
1080            .contains(":relayer:relayer-1:tx:test-id"));
1081        assert!(repo
1082            .tx_to_relayer_key("test-id")
1083            .contains(":relayer:tx_to_relayer:test-id"));
1084        assert!(repo.relayer_list_key().contains(":relayer_list"));
1085        assert!(repo
1086            .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1087            .contains(":relayer:relayer-1:status:Pending"));
1088        assert!(repo
1089            .relayer_nonce_key("relayer-1", 42)
1090            .contains(":relayer:relayer-1:nonce:42"));
1091    }
1092
1093    #[tokio::test]
1094    #[ignore = "Requires active Redis instance"]
1095    async fn test_serialize_deserialize_transaction() {
1096        let repo = setup_test_repo().await;
1097        let tx = create_test_transaction("test-1");
1098
1099        let serialized = repo
1100            .serialize_entity(&tx, |t| &t.id, "transaction")
1101            .expect("Serialization should succeed");
1102        let deserialized: TransactionRepoModel = repo
1103            .deserialize_entity(&serialized, "test-1", "transaction")
1104            .expect("Deserialization should succeed");
1105
1106        assert_eq!(tx.id, deserialized.id);
1107        assert_eq!(tx.relayer_id, deserialized.relayer_id);
1108        assert_eq!(tx.status, deserialized.status);
1109    }
1110
1111    #[tokio::test]
1112    #[ignore = "Requires active Redis instance"]
1113    async fn test_extract_nonce() {
1114        let repo = setup_test_repo().await;
1115        let random_id = Uuid::new_v4().to_string();
1116        let relayer_id = Uuid::new_v4().to_string();
1117        let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1118
1119        let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1120        assert_eq!(nonce, Some(42));
1121    }
1122
1123    #[tokio::test]
1124    #[ignore = "Requires active Redis instance"]
1125    async fn test_create_transaction() {
1126        let repo = setup_test_repo().await;
1127        let random_id = Uuid::new_v4().to_string();
1128        let tx = create_test_transaction(&random_id);
1129
1130        let result = repo.create(tx.clone()).await.unwrap();
1131        assert_eq!(result.id, tx.id);
1132    }
1133
1134    #[tokio::test]
1135    #[ignore = "Requires active Redis instance"]
1136    async fn test_get_transaction() {
1137        let repo = setup_test_repo().await;
1138        let random_id = Uuid::new_v4().to_string();
1139        let tx = create_test_transaction(&random_id);
1140
1141        repo.create(tx.clone()).await.unwrap();
1142        let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1143        assert_eq!(stored.id, tx.id);
1144        assert_eq!(stored.relayer_id, tx.relayer_id);
1145    }
1146
1147    #[tokio::test]
1148    #[ignore = "Requires active Redis instance"]
1149    async fn test_update_transaction() {
1150        let repo = setup_test_repo().await;
1151        let random_id = Uuid::new_v4().to_string();
1152        let mut tx = create_test_transaction(&random_id);
1153
1154        repo.create(tx.clone()).await.unwrap();
1155        tx.status = TransactionStatus::Confirmed;
1156
1157        let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1158        assert!(matches!(updated.status, TransactionStatus::Confirmed));
1159    }
1160
1161    #[tokio::test]
1162    #[ignore = "Requires active Redis instance"]
1163    async fn test_delete_transaction() {
1164        let repo = setup_test_repo().await;
1165        let random_id = Uuid::new_v4().to_string();
1166        let tx = create_test_transaction(&random_id);
1167
1168        repo.create(tx).await.unwrap();
1169        repo.delete_by_id(random_id.to_string()).await.unwrap();
1170
1171        let result = repo.get_by_id(random_id.to_string()).await;
1172        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1173    }
1174
1175    #[tokio::test]
1176    #[ignore = "Requires active Redis instance"]
1177    async fn test_list_all_transactions() {
1178        let repo = setup_test_repo().await;
1179        let random_id = Uuid::new_v4().to_string();
1180        let random_id2 = Uuid::new_v4().to_string();
1181
1182        let tx1 = create_test_transaction(&random_id);
1183        let tx2 = create_test_transaction(&random_id2);
1184
1185        repo.create(tx1).await.unwrap();
1186        repo.create(tx2).await.unwrap();
1187
1188        let transactions = repo.list_all().await.unwrap();
1189        assert!(transactions.len() >= 2);
1190    }
1191
1192    #[tokio::test]
1193    #[ignore = "Requires active Redis instance"]
1194    async fn test_count_transactions() {
1195        let repo = setup_test_repo().await;
1196        let random_id = Uuid::new_v4().to_string();
1197        let tx = create_test_transaction(&random_id);
1198
1199        let count = repo.count().await.unwrap();
1200        repo.create(tx).await.unwrap();
1201        assert!(repo.count().await.unwrap() > count);
1202    }
1203
1204    #[tokio::test]
1205    #[ignore = "Requires active Redis instance"]
1206    async fn test_get_nonexistent_transaction() {
1207        let repo = setup_test_repo().await;
1208        let result = repo.get_by_id("nonexistent".to_string()).await;
1209        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1210    }
1211
1212    #[tokio::test]
1213    #[ignore = "Requires active Redis instance"]
1214    async fn test_duplicate_transaction_creation() {
1215        let repo = setup_test_repo().await;
1216        let random_id = Uuid::new_v4().to_string();
1217
1218        let tx = create_test_transaction(&random_id);
1219
1220        repo.create(tx.clone()).await.unwrap();
1221        let result = repo.create(tx).await;
1222
1223        assert!(matches!(
1224            result,
1225            Err(RepositoryError::ConstraintViolation(_))
1226        ));
1227    }
1228
1229    #[tokio::test]
1230    #[ignore = "Requires active Redis instance"]
1231    async fn test_update_nonexistent_transaction() {
1232        let repo = setup_test_repo().await;
1233        let tx = create_test_transaction("test-1");
1234
1235        let result = repo.update("nonexistent".to_string(), tx).await;
1236        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1237    }
1238
1239    #[tokio::test]
1240    #[ignore = "Requires active Redis instance"]
1241    async fn test_list_paginated() {
1242        let repo = setup_test_repo().await;
1243
1244        // Create multiple transactions
1245        for _ in 1..=10 {
1246            let random_id = Uuid::new_v4().to_string();
1247            let tx = create_test_transaction(&random_id);
1248            repo.create(tx).await.unwrap();
1249        }
1250
1251        // Test first page with 3 items per page
1252        let query = PaginationQuery {
1253            page: 1,
1254            per_page: 3,
1255        };
1256        let result = repo.list_paginated(query).await.unwrap();
1257        assert_eq!(result.items.len(), 3);
1258        assert!(result.total >= 10);
1259        assert_eq!(result.page, 1);
1260        assert_eq!(result.per_page, 3);
1261
1262        // Test empty page (beyond total items)
1263        let query = PaginationQuery {
1264            page: 1000,
1265            per_page: 3,
1266        };
1267        let result = repo.list_paginated(query).await.unwrap();
1268        assert_eq!(result.items.len(), 0);
1269    }
1270
1271    #[tokio::test]
1272    #[ignore = "Requires active Redis instance"]
1273    async fn test_find_by_relayer_id() {
1274        let repo = setup_test_repo().await;
1275        let random_id = Uuid::new_v4().to_string();
1276        let random_id2 = Uuid::new_v4().to_string();
1277        let random_id3 = Uuid::new_v4().to_string();
1278
1279        let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
1280        let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
1281        let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
1282
1283        repo.create(tx1).await.unwrap();
1284        repo.create(tx2).await.unwrap();
1285        repo.create(tx3).await.unwrap();
1286
1287        // Test finding transactions for relayer-1
1288        let query = PaginationQuery {
1289            page: 1,
1290            per_page: 10,
1291        };
1292        let result = repo
1293            .find_by_relayer_id("relayer-1", query.clone())
1294            .await
1295            .unwrap();
1296        assert!(result.total >= 2);
1297        assert!(result.items.len() >= 2);
1298        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
1299
1300        // Test finding transactions for relayer-2
1301        let result = repo
1302            .find_by_relayer_id("relayer-2", query.clone())
1303            .await
1304            .unwrap();
1305        assert!(result.total >= 1);
1306        assert!(!result.items.is_empty());
1307        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
1308
1309        // Test finding transactions for non-existent relayer
1310        let result = repo
1311            .find_by_relayer_id("non-existent", query.clone())
1312            .await
1313            .unwrap();
1314        assert_eq!(result.total, 0);
1315        assert_eq!(result.items.len(), 0);
1316    }
1317
1318    #[tokio::test]
1319    #[ignore = "Requires active Redis instance"]
1320    async fn test_find_by_status() {
1321        let repo = setup_test_repo().await;
1322        let random_id = Uuid::new_v4().to_string();
1323        let random_id2 = Uuid::new_v4().to_string();
1324        let random_id3 = Uuid::new_v4().to_string();
1325        let relayer_id = Uuid::new_v4().to_string();
1326        let tx1 = create_test_transaction_with_status(
1327            &random_id,
1328            &relayer_id,
1329            TransactionStatus::Pending,
1330        );
1331        let tx2 =
1332            create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
1333        let tx3 = create_test_transaction_with_status(
1334            &random_id3,
1335            &relayer_id,
1336            TransactionStatus::Confirmed,
1337        );
1338
1339        repo.create(tx1).await.unwrap();
1340        repo.create(tx2).await.unwrap();
1341        repo.create(tx3).await.unwrap();
1342
1343        // Test finding pending transactions
1344        let result = repo
1345            .find_by_status(&relayer_id, &[TransactionStatus::Pending])
1346            .await
1347            .unwrap();
1348        assert_eq!(result.len(), 1);
1349        assert_eq!(result[0].status, TransactionStatus::Pending);
1350
1351        // Test finding multiple statuses
1352        let result = repo
1353            .find_by_status(
1354                &relayer_id,
1355                &[TransactionStatus::Pending, TransactionStatus::Sent],
1356            )
1357            .await
1358            .unwrap();
1359        assert_eq!(result.len(), 2);
1360
1361        // Test finding non-existent status
1362        let result = repo
1363            .find_by_status(&relayer_id, &[TransactionStatus::Failed])
1364            .await
1365            .unwrap();
1366        assert_eq!(result.len(), 0);
1367    }
1368
1369    #[tokio::test]
1370    #[ignore = "Requires active Redis instance"]
1371    async fn test_find_by_nonce() {
1372        let repo = setup_test_repo().await;
1373        let random_id = Uuid::new_v4().to_string();
1374        let random_id2 = Uuid::new_v4().to_string();
1375        let relayer_id = Uuid::new_v4().to_string();
1376
1377        let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1378        let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
1379
1380        repo.create(tx1.clone()).await.unwrap();
1381        repo.create(tx2).await.unwrap();
1382
1383        // Test finding existing nonce
1384        let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1385        assert!(result.is_some());
1386        assert_eq!(result.unwrap().id, random_id);
1387
1388        // Test finding non-existent nonce
1389        let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
1390        assert!(result.is_none());
1391
1392        // Test finding nonce for non-existent relayer
1393        let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
1394        assert!(result.is_none());
1395    }
1396
1397    #[tokio::test]
1398    #[ignore = "Requires active Redis instance"]
1399    async fn test_update_status() {
1400        let repo = setup_test_repo().await;
1401        let random_id = Uuid::new_v4().to_string();
1402        let tx = create_test_transaction(&random_id);
1403
1404        repo.create(tx).await.unwrap();
1405        let updated = repo
1406            .update_status(random_id.to_string(), TransactionStatus::Confirmed)
1407            .await
1408            .unwrap();
1409        assert_eq!(updated.status, TransactionStatus::Confirmed);
1410    }
1411
1412    #[tokio::test]
1413    #[ignore = "Requires active Redis instance"]
1414    async fn test_partial_update() {
1415        let repo = setup_test_repo().await;
1416        let random_id = Uuid::new_v4().to_string();
1417        let tx = create_test_transaction(&random_id);
1418
1419        repo.create(tx).await.unwrap();
1420
1421        let update = TransactionUpdateRequest {
1422            status: Some(TransactionStatus::Sent),
1423            status_reason: Some("Transaction sent".to_string()),
1424            sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
1425            confirmed_at: None,
1426            network_data: None,
1427            hashes: None,
1428            is_canceled: None,
1429            priced_at: None,
1430            noop_count: None,
1431            delete_at: None,
1432        };
1433
1434        let updated = repo
1435            .partial_update(random_id.to_string(), update)
1436            .await
1437            .unwrap();
1438        assert_eq!(updated.status, TransactionStatus::Sent);
1439        assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
1440        assert_eq!(
1441            updated.sent_at,
1442            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1443        );
1444    }
1445
1446    #[tokio::test]
1447    #[ignore = "Requires active Redis instance"]
1448    async fn test_set_sent_at() {
1449        let repo = setup_test_repo().await;
1450        let random_id = Uuid::new_v4().to_string();
1451        let tx = create_test_transaction(&random_id);
1452
1453        repo.create(tx).await.unwrap();
1454        let updated = repo
1455            .set_sent_at(
1456                random_id.to_string(),
1457                "2025-01-27T16:00:00.000000+00:00".to_string(),
1458            )
1459            .await
1460            .unwrap();
1461        assert_eq!(
1462            updated.sent_at,
1463            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1464        );
1465    }
1466
1467    #[tokio::test]
1468    #[ignore = "Requires active Redis instance"]
1469    async fn test_set_confirmed_at() {
1470        let repo = setup_test_repo().await;
1471        let random_id = Uuid::new_v4().to_string();
1472        let tx = create_test_transaction(&random_id);
1473
1474        repo.create(tx).await.unwrap();
1475        let updated = repo
1476            .set_confirmed_at(
1477                random_id.to_string(),
1478                "2025-01-27T16:00:00.000000+00:00".to_string(),
1479            )
1480            .await
1481            .unwrap();
1482        assert_eq!(
1483            updated.confirmed_at,
1484            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1485        );
1486    }
1487
1488    #[tokio::test]
1489    #[ignore = "Requires active Redis instance"]
1490    async fn test_update_network_data() {
1491        let repo = setup_test_repo().await;
1492        let random_id = Uuid::new_v4().to_string();
1493        let tx = create_test_transaction(&random_id);
1494
1495        repo.create(tx).await.unwrap();
1496
1497        let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
1498            gas_price: Some(2000000000),
1499            gas_limit: Some(42000),
1500            nonce: Some(2),
1501            value: U256::from_str("2000000000000000000").unwrap(),
1502            data: Some("0x1234".to_string()),
1503            from: "0xNewSender".to_string(),
1504            to: Some("0xNewRecipient".to_string()),
1505            chain_id: 1,
1506            signature: None,
1507            hash: Some("0xnewhash".to_string()),
1508            speed: Some(Speed::SafeLow),
1509            max_fee_per_gas: None,
1510            max_priority_fee_per_gas: None,
1511            raw: None,
1512        });
1513
1514        let updated = repo
1515            .update_network_data(random_id.to_string(), new_network_data.clone())
1516            .await
1517            .unwrap();
1518        assert_eq!(
1519            updated
1520                .network_data
1521                .get_evm_transaction_data()
1522                .unwrap()
1523                .hash,
1524            new_network_data.get_evm_transaction_data().unwrap().hash
1525        );
1526    }
1527
1528    #[tokio::test]
1529    #[ignore = "Requires active Redis instance"]
1530    async fn test_debug_implementation() {
1531        let repo = setup_test_repo().await;
1532        let debug_str = format!("{:?}", repo);
1533        assert!(debug_str.contains("RedisTransactionRepository"));
1534        assert!(debug_str.contains("test_prefix"));
1535    }
1536
1537    #[tokio::test]
1538    #[ignore = "Requires active Redis instance"]
1539    async fn test_error_handling_empty_id() {
1540        let repo = setup_test_repo().await;
1541
1542        let result = repo.get_by_id("".to_string()).await;
1543        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1544
1545        let result = repo
1546            .update("".to_string(), create_test_transaction("test"))
1547            .await;
1548        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1549
1550        let result = repo.delete_by_id("".to_string()).await;
1551        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1552    }
1553
1554    #[tokio::test]
1555    #[ignore = "Requires active Redis instance"]
1556    async fn test_pagination_validation() {
1557        let repo = setup_test_repo().await;
1558
1559        let query = PaginationQuery {
1560            page: 1,
1561            per_page: 0,
1562        };
1563        let result = repo.list_paginated(query).await;
1564        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1565    }
1566
1567    #[tokio::test]
1568    #[ignore = "Requires active Redis instance"]
1569    async fn test_index_consistency() {
1570        let repo = setup_test_repo().await;
1571        let random_id = Uuid::new_v4().to_string();
1572        let relayer_id = Uuid::new_v4().to_string();
1573        let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1574
1575        // Create transaction
1576        repo.create(tx.clone()).await.unwrap();
1577
1578        // Verify it can be found by nonce
1579        let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1580        assert!(found.is_some());
1581
1582        // Update the transaction with a new nonce
1583        let mut updated_tx = tx.clone();
1584        if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
1585            evm_data.nonce = Some(43);
1586        }
1587
1588        repo.update(random_id.to_string(), updated_tx)
1589            .await
1590            .unwrap();
1591
1592        // Verify old nonce index is cleaned up
1593        let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1594        assert!(old_nonce_result.is_none());
1595
1596        // Verify new nonce index works
1597        let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
1598        assert!(new_nonce_result.is_some());
1599    }
1600
1601    #[tokio::test]
1602    #[ignore = "Requires active Redis instance"]
1603    async fn test_has_entries() {
1604        let repo = setup_test_repo().await;
1605        assert!(!repo.has_entries().await.unwrap());
1606
1607        let tx_id = uuid::Uuid::new_v4().to_string();
1608        let tx = create_test_transaction(&tx_id);
1609        repo.create(tx.clone()).await.unwrap();
1610
1611        assert!(repo.has_entries().await.unwrap());
1612    }
1613
1614    #[tokio::test]
1615    #[ignore = "Requires active Redis instance"]
1616    async fn test_drop_all_entries() {
1617        let repo = setup_test_repo().await;
1618        let tx_id = uuid::Uuid::new_v4().to_string();
1619        let tx = create_test_transaction(&tx_id);
1620        repo.create(tx.clone()).await.unwrap();
1621        assert!(repo.has_entries().await.unwrap());
1622
1623        repo.drop_all_entries().await.unwrap();
1624        assert!(!repo.has_entries().await.unwrap());
1625    }
1626
1627    // Tests for delete_at field setting on final status updates
1628    #[tokio::test]
1629    #[ignore = "Requires active Redis instance"]
1630    async fn test_update_status_sets_delete_at_for_final_statuses() {
1631        let _lock = ENV_MUTEX.lock().await;
1632
1633        use chrono::{DateTime, Duration, Utc};
1634        use std::env;
1635
1636        // Use a unique test environment variable to avoid conflicts
1637        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
1638
1639        let repo = setup_test_repo().await;
1640
1641        let final_statuses = [
1642            TransactionStatus::Canceled,
1643            TransactionStatus::Confirmed,
1644            TransactionStatus::Failed,
1645            TransactionStatus::Expired,
1646        ];
1647
1648        for (i, status) in final_statuses.iter().enumerate() {
1649            let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
1650            let mut tx = create_test_transaction(&tx_id);
1651
1652            // Ensure transaction has no delete_at initially and is in pending state
1653            tx.delete_at = None;
1654            tx.status = TransactionStatus::Pending;
1655
1656            repo.create(tx).await.unwrap();
1657
1658            let before_update = Utc::now();
1659
1660            // Update to final status
1661            let updated = repo
1662                .update_status(tx_id.clone(), status.clone())
1663                .await
1664                .unwrap();
1665
1666            // Should have delete_at set
1667            assert!(
1668                updated.delete_at.is_some(),
1669                "delete_at should be set for status: {:?}",
1670                status
1671            );
1672
1673            // Verify the timestamp is reasonable (approximately 6 hours from now)
1674            let delete_at_str = updated.delete_at.unwrap();
1675            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1676                .expect("delete_at should be valid RFC3339")
1677                .with_timezone(&Utc);
1678
1679            let duration_from_before = delete_at.signed_duration_since(before_update);
1680            let expected_duration = Duration::hours(6);
1681            let tolerance = Duration::minutes(5);
1682
1683            assert!(
1684                duration_from_before >= expected_duration - tolerance &&
1685                duration_from_before <= expected_duration + tolerance,
1686                "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1687                status, duration_from_before
1688            );
1689        }
1690
1691        // Cleanup
1692        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1693    }
1694
1695    #[tokio::test]
1696    #[ignore = "Requires active Redis instance"]
1697    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1698        let _lock = ENV_MUTEX.lock().await;
1699
1700        use std::env;
1701
1702        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1703
1704        let repo = setup_test_repo().await;
1705
1706        let non_final_statuses = [
1707            TransactionStatus::Pending,
1708            TransactionStatus::Sent,
1709            TransactionStatus::Submitted,
1710            TransactionStatus::Mined,
1711        ];
1712
1713        for (i, status) in non_final_statuses.iter().enumerate() {
1714            let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
1715            let mut tx = create_test_transaction(&tx_id);
1716            tx.delete_at = None;
1717            tx.status = TransactionStatus::Pending;
1718
1719            repo.create(tx).await.unwrap();
1720
1721            // Update to non-final status
1722            let updated = repo
1723                .update_status(tx_id.clone(), status.clone())
1724                .await
1725                .unwrap();
1726
1727            // Should NOT have delete_at set
1728            assert!(
1729                updated.delete_at.is_none(),
1730                "delete_at should NOT be set for status: {:?}",
1731                status
1732            );
1733        }
1734
1735        // Cleanup
1736        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1737    }
1738
1739    #[tokio::test]
1740    #[ignore = "Requires active Redis instance"]
1741    async fn test_partial_update_sets_delete_at_for_final_statuses() {
1742        let _lock = ENV_MUTEX.lock().await;
1743
1744        use chrono::{DateTime, Duration, Utc};
1745        use std::env;
1746
1747        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1748
1749        let repo = setup_test_repo().await;
1750        let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
1751        let mut tx = create_test_transaction(&tx_id);
1752        tx.delete_at = None;
1753        tx.status = TransactionStatus::Pending;
1754
1755        repo.create(tx).await.unwrap();
1756
1757        let before_update = Utc::now();
1758
1759        // Use partial_update to set status to Confirmed (final status)
1760        let update = TransactionUpdateRequest {
1761            status: Some(TransactionStatus::Confirmed),
1762            status_reason: Some("Transaction completed".to_string()),
1763            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1764            ..Default::default()
1765        };
1766
1767        let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
1768
1769        // Should have delete_at set
1770        assert!(
1771            updated.delete_at.is_some(),
1772            "delete_at should be set when updating to Confirmed status"
1773        );
1774
1775        // Verify the timestamp is reasonable (approximately 8 hours from now)
1776        let delete_at_str = updated.delete_at.unwrap();
1777        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1778            .expect("delete_at should be valid RFC3339")
1779            .with_timezone(&Utc);
1780
1781        let duration_from_before = delete_at.signed_duration_since(before_update);
1782        let expected_duration = Duration::hours(8);
1783        let tolerance = Duration::minutes(5);
1784
1785        assert!(
1786            duration_from_before >= expected_duration - tolerance
1787                && duration_from_before <= expected_duration + tolerance,
1788            "delete_at should be approximately 8 hours from now. Duration: {:?}",
1789            duration_from_before
1790        );
1791
1792        // Also verify other fields were updated
1793        assert_eq!(updated.status, TransactionStatus::Confirmed);
1794        assert_eq!(
1795            updated.status_reason,
1796            Some("Transaction completed".to_string())
1797        );
1798        assert_eq!(
1799            updated.confirmed_at,
1800            Some("2023-01-01T12:05:00Z".to_string())
1801        );
1802
1803        // Cleanup
1804        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1805    }
1806
1807    #[tokio::test]
1808    #[ignore = "Requires active Redis instance"]
1809    async fn test_update_status_preserves_existing_delete_at() {
1810        let _lock = ENV_MUTEX.lock().await;
1811
1812        use std::env;
1813
1814        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1815
1816        let repo = setup_test_repo().await;
1817        let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
1818        let mut tx = create_test_transaction(&tx_id);
1819
1820        // Set an existing delete_at value
1821        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1822        tx.delete_at = Some(existing_delete_at.clone());
1823        tx.status = TransactionStatus::Pending;
1824
1825        repo.create(tx).await.unwrap();
1826
1827        // Update to final status
1828        let updated = repo
1829            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1830            .await
1831            .unwrap();
1832
1833        // Should preserve the existing delete_at value
1834        assert_eq!(
1835            updated.delete_at,
1836            Some(existing_delete_at),
1837            "Existing delete_at should be preserved when updating to final status"
1838        );
1839
1840        // Cleanup
1841        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1842    }
1843    #[tokio::test]
1844    #[ignore = "Requires active Redis instance"]
1845    async fn test_partial_update_without_status_change_preserves_delete_at() {
1846        let _lock = ENV_MUTEX.lock().await;
1847
1848        use std::env;
1849
1850        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1851
1852        let repo = setup_test_repo().await;
1853        let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
1854        let mut tx = create_test_transaction(&tx_id);
1855        tx.delete_at = None;
1856        tx.status = TransactionStatus::Pending;
1857
1858        repo.create(tx).await.unwrap();
1859
1860        // First, update to final status to set delete_at
1861        let updated1 = repo
1862            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1863            .await
1864            .unwrap();
1865
1866        assert!(updated1.delete_at.is_some());
1867        let original_delete_at = updated1.delete_at.clone();
1868
1869        // Now update other fields without changing status
1870        let update = TransactionUpdateRequest {
1871            status: None, // No status change
1872            status_reason: Some("Updated reason".to_string()),
1873            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1874            ..Default::default()
1875        };
1876
1877        let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
1878
1879        // delete_at should be preserved
1880        assert_eq!(
1881            updated2.delete_at, original_delete_at,
1882            "delete_at should be preserved when status is not updated"
1883        );
1884
1885        // Other fields should be updated
1886        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
1887        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1888        assert_eq!(
1889            updated2.confirmed_at,
1890            Some("2023-01-01T12:10:00Z".to_string())
1891        );
1892
1893        // Cleanup
1894        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1895    }
1896}