openzeppelin_relayer/domain/transaction/stellar/
stellar_transaction.rs

1/// This module defines the `StellarRelayerTransaction` struct and its associated
2/// functionality for handling Stellar transactions.
3/// It includes methods for preparing, submitting, handling status, and
4/// managing notifications for transactions. The module leverages various
5/// services and repositories to perform these operations asynchronously.
6use crate::{
7    domain::transaction::{stellar::fetch_next_sequence_from_chain, Transaction},
8    jobs::{JobProducer, JobProducerTrait, TransactionRequest},
9    models::{
10        produce_transaction_update_notification_payload, NetworkTransactionRequest,
11        RelayerRepoModel, TransactionError, TransactionRepoModel, TransactionStatus,
12        TransactionUpdateRequest,
13    },
14    repositories::{
15        RelayerRepositoryStorage, Repository, TransactionCounterRepositoryStorage,
16        TransactionCounterTrait, TransactionRepository, TransactionRepositoryStorage,
17    },
18    services::{Signer, StellarProvider, StellarProviderTrait, StellarSigner},
19};
20use async_trait::async_trait;
21use eyre::Result;
22use log::info;
23use std::sync::Arc;
24
25use super::lane_gate;
26
27#[allow(dead_code)]
28pub struct StellarRelayerTransaction<R, T, J, S, P, C>
29where
30    R: Repository<RelayerRepoModel, String>,
31    T: TransactionRepository,
32    J: JobProducerTrait,
33    S: Signer,
34    P: StellarProviderTrait,
35    C: TransactionCounterTrait,
36{
37    relayer: RelayerRepoModel,
38    relayer_repository: Arc<R>,
39    transaction_repository: Arc<T>,
40    job_producer: Arc<J>,
41    signer: Arc<S>,
42    provider: P,
43    transaction_counter_service: Arc<C>,
44}
45
46#[allow(dead_code)]
47impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
48where
49    R: Repository<RelayerRepoModel, String>,
50    T: TransactionRepository,
51    J: JobProducerTrait,
52    S: Signer,
53    P: StellarProviderTrait,
54    C: TransactionCounterTrait,
55{
56    /// Creates a new `StellarRelayerTransaction`.
57    ///
58    /// # Arguments
59    ///
60    /// * `relayer` - The relayer model.
61    /// * `relayer_repository` - Storage for relayer repository.
62    /// * `transaction_repository` - Storage for transaction repository.
63    /// * `job_producer` - Producer for job queue.
64    /// * `signer` - The Stellar signer.
65    /// * `provider` - The Stellar provider.
66    /// * `transaction_counter_service` - Service for managing transaction counters.
67    ///
68    /// # Returns
69    ///
70    /// A result containing the new `StellarRelayerTransaction` or a `TransactionError`.
71    #[allow(clippy::too_many_arguments)]
72    pub fn new(
73        relayer: RelayerRepoModel,
74        relayer_repository: Arc<R>,
75        transaction_repository: Arc<T>,
76        job_producer: Arc<J>,
77        signer: Arc<S>,
78        provider: P,
79        transaction_counter_service: Arc<C>,
80    ) -> Result<Self, TransactionError> {
81        Ok(Self {
82            relayer,
83            relayer_repository,
84            transaction_repository,
85            job_producer,
86            signer,
87            provider,
88            transaction_counter_service,
89        })
90    }
91
92    pub fn provider(&self) -> &P {
93        &self.provider
94    }
95
96    pub fn relayer(&self) -> &RelayerRepoModel {
97        &self.relayer
98    }
99
100    pub fn job_producer(&self) -> &J {
101        &self.job_producer
102    }
103
104    pub fn transaction_repository(&self) -> &T {
105        &self.transaction_repository
106    }
107
108    pub fn signer(&self) -> &S {
109        &self.signer
110    }
111
112    pub fn transaction_counter_service(&self) -> &C {
113        &self.transaction_counter_service
114    }
115
116    /// Send a transaction-request job for the given transaction.
117    pub async fn send_transaction_request_job(
118        &self,
119        tx: &TransactionRepoModel,
120        delay_seconds: Option<i64>,
121    ) -> Result<(), TransactionError> {
122        let job = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
123        self.job_producer()
124            .produce_transaction_request_job(job, delay_seconds)
125            .await?;
126        Ok(())
127    }
128
129    /// Sends a transaction update notification if a notification ID is configured.
130    pub(super) async fn send_transaction_update_notification(
131        &self,
132        tx: &TransactionRepoModel,
133    ) -> Result<(), TransactionError> {
134        if let Some(notification_id) = &self.relayer().notification_id {
135            self.job_producer()
136                .produce_send_notification_job(
137                    produce_transaction_update_notification_payload(notification_id, tx),
138                    None,
139                )
140                .await
141                .map_err(|e| {
142                    TransactionError::UnexpectedError(format!("Failed to send notification: {}", e))
143                })?;
144        }
145        Ok(())
146    }
147
148    /// Helper function to update transaction status, save it, and send a notification.
149    pub async fn finalize_transaction_state(
150        &self,
151        tx_id: String,
152        update_req: TransactionUpdateRequest,
153    ) -> Result<TransactionRepoModel, TransactionError> {
154        let updated_tx = self
155            .transaction_repository()
156            .partial_update(tx_id, update_req)
157            .await?;
158
159        self.send_transaction_update_notification(&updated_tx)
160            .await?;
161        Ok(updated_tx)
162    }
163
164    pub async fn enqueue_next_pending_transaction(
165        &self,
166        finished_tx_id: &str,
167    ) -> Result<(), TransactionError> {
168        if let Some(next) = self
169            .find_oldest_pending_for_relayer(&self.relayer().id)
170            .await?
171        {
172            // Atomic hand-over while still owning the lane
173            info!("Handing over lane from {} to {}", finished_tx_id, next.id);
174            lane_gate::pass_to(&self.relayer().id, finished_tx_id, &next.id);
175            self.send_transaction_request_job(&next, None).await?;
176        } else {
177            info!("Releasing relayer lane after {}", finished_tx_id);
178            lane_gate::free(&self.relayer().id, finished_tx_id);
179        }
180        Ok(())
181    }
182
183    /// Finds the oldest pending transaction for a relayer.
184    async fn find_oldest_pending_for_relayer(
185        &self,
186        relayer_id: &str,
187    ) -> Result<Option<TransactionRepoModel>, TransactionError> {
188        let pending_txs = self
189            .transaction_repository()
190            .find_by_status(relayer_id, &[TransactionStatus::Pending])
191            .await
192            .map_err(TransactionError::from)?;
193
194        Ok(pending_txs.into_iter().next())
195    }
196
197    /// Syncs the sequence number from the blockchain for the relayer's address.
198    /// This fetches the on-chain sequence number and updates the local counter to the next usable value.
199    pub async fn sync_sequence_from_chain(
200        &self,
201        relayer_address: &str,
202    ) -> Result<(), TransactionError> {
203        info!(
204            "Syncing sequence number from chain for address: {}",
205            relayer_address
206        );
207
208        // Use the shared helper to fetch the next sequence
209        let next_usable_seq = fetch_next_sequence_from_chain(self.provider(), relayer_address)
210            .await
211            .map_err(TransactionError::UnexpectedError)?;
212
213        // Update the local counter to the next usable sequence
214        self.transaction_counter_service()
215            .set(&self.relayer().id, relayer_address, next_usable_seq)
216            .await
217            .map_err(|e| {
218                TransactionError::UnexpectedError(format!(
219                    "Failed to update sequence counter: {}",
220                    e
221                ))
222            })?;
223
224        info!("Updated local sequence counter to {}", next_usable_seq);
225        Ok(())
226    }
227
228    /// Resets a transaction to its pre-prepare state for reprocessing through the pipeline.
229    /// This is used when a transaction fails with a bad sequence error and needs to be retried.
230    pub async fn reset_transaction_for_retry(
231        &self,
232        tx: TransactionRepoModel,
233    ) -> Result<TransactionRepoModel, TransactionError> {
234        info!("Resetting transaction {} for retry through pipeline", tx.id);
235
236        // Use the model's built-in reset method
237        let update_req = tx.create_reset_update_request()?;
238
239        // Update the transaction
240        let reset_tx = self
241            .transaction_repository()
242            .partial_update(tx.id.clone(), update_req)
243            .await?;
244
245        info!(
246            "Transaction {} reset successfully to pre-prepare state",
247            reset_tx.id
248        );
249        Ok(reset_tx)
250    }
251}
252
253#[async_trait]
254impl<R, T, J, S, P, C> Transaction for StellarRelayerTransaction<R, T, J, S, P, C>
255where
256    R: Repository<RelayerRepoModel, String> + Send + Sync,
257    T: TransactionRepository + Send + Sync,
258    J: JobProducerTrait + Send + Sync,
259    S: Signer + Send + Sync,
260    P: StellarProviderTrait + Send + Sync,
261    C: TransactionCounterTrait + Send + Sync,
262{
263    async fn prepare_transaction(
264        &self,
265        tx: TransactionRepoModel,
266    ) -> Result<TransactionRepoModel, TransactionError> {
267        self.prepare_transaction_impl(tx).await
268    }
269
270    async fn submit_transaction(
271        &self,
272        tx: TransactionRepoModel,
273    ) -> Result<TransactionRepoModel, TransactionError> {
274        self.submit_transaction_impl(tx).await
275    }
276
277    async fn resubmit_transaction(
278        &self,
279        tx: TransactionRepoModel,
280    ) -> Result<TransactionRepoModel, TransactionError> {
281        Ok(tx)
282    }
283
284    async fn handle_transaction_status(
285        &self,
286        tx: TransactionRepoModel,
287    ) -> Result<TransactionRepoModel, TransactionError> {
288        self.handle_transaction_status_impl(tx).await
289    }
290
291    async fn cancel_transaction(
292        &self,
293        tx: TransactionRepoModel,
294    ) -> Result<TransactionRepoModel, TransactionError> {
295        Ok(tx)
296    }
297
298    async fn replace_transaction(
299        &self,
300        _old_tx: TransactionRepoModel,
301        _new_tx_request: NetworkTransactionRequest,
302    ) -> Result<TransactionRepoModel, TransactionError> {
303        Ok(_old_tx)
304    }
305
306    async fn sign_transaction(
307        &self,
308        tx: TransactionRepoModel,
309    ) -> Result<TransactionRepoModel, TransactionError> {
310        Ok(tx)
311    }
312
313    async fn validate_transaction(
314        &self,
315        _tx: TransactionRepoModel,
316    ) -> Result<bool, TransactionError> {
317        Ok(true)
318    }
319}
320
321pub type DefaultStellarTransaction = StellarRelayerTransaction<
322    RelayerRepositoryStorage,
323    TransactionRepositoryStorage,
324    JobProducer,
325    StellarSigner,
326    StellarProvider,
327    TransactionCounterRepositoryStorage,
328>;
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::models::{NetworkTransactionData, RepositoryError};
334    use std::sync::Arc;
335
336    use crate::domain::transaction::stellar::test_helpers::*;
337
338    #[test]
339    fn new_returns_ok() {
340        let relayer = create_test_relayer();
341        let mocks = default_test_mocks();
342        let result = StellarRelayerTransaction::new(
343            relayer,
344            Arc::new(mocks.relayer_repo),
345            Arc::new(mocks.tx_repo),
346            Arc::new(mocks.job_producer),
347            Arc::new(mocks.signer),
348            mocks.provider,
349            Arc::new(mocks.counter),
350        );
351        assert!(result.is_ok());
352    }
353
354    #[test]
355    fn accessor_methods_return_correct_references() {
356        let relayer = create_test_relayer();
357        let mocks = default_test_mocks();
358        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
359
360        // Test all accessor methods
361        assert_eq!(handler.relayer().id, "relayer-1");
362        assert_eq!(handler.relayer().address, TEST_PK);
363
364        // These should not panic and return valid references
365        let _ = handler.provider();
366        let _ = handler.job_producer();
367        let _ = handler.transaction_repository();
368        let _ = handler.signer();
369        let _ = handler.transaction_counter_service();
370    }
371
372    #[tokio::test]
373    async fn send_transaction_request_job_success() {
374        let relayer = create_test_relayer();
375        let mut mocks = default_test_mocks();
376
377        mocks
378            .job_producer
379            .expect_produce_transaction_request_job()
380            .withf(|job, delay| {
381                job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay.is_none()
382            })
383            .times(1)
384            .returning(|_, _| Box::pin(async { Ok(()) }));
385
386        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
387        let tx = create_test_transaction(&relayer.id);
388
389        let result = handler.send_transaction_request_job(&tx, None).await;
390        assert!(result.is_ok());
391    }
392
393    #[tokio::test]
394    async fn send_transaction_request_job_with_delay() {
395        let relayer = create_test_relayer();
396        let mut mocks = default_test_mocks();
397
398        mocks
399            .job_producer
400            .expect_produce_transaction_request_job()
401            .withf(|job, delay| {
402                job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay == &Some(60)
403            })
404            .times(1)
405            .returning(|_, _| Box::pin(async { Ok(()) }));
406
407        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
408        let tx = create_test_transaction(&relayer.id);
409
410        let result = handler.send_transaction_request_job(&tx, Some(60)).await;
411        assert!(result.is_ok());
412    }
413
414    #[tokio::test]
415    async fn finalize_transaction_state_success() {
416        let relayer = create_test_relayer();
417        let mut mocks = default_test_mocks();
418
419        // Mock repository update
420        mocks
421            .tx_repo
422            .expect_partial_update()
423            .withf(|tx_id, update| {
424                tx_id == "tx-1"
425                    && update.status == Some(TransactionStatus::Confirmed)
426                    && update.status_reason == Some("Transaction confirmed".to_string())
427            })
428            .times(1)
429            .returning(|tx_id, update| {
430                let mut tx = create_test_transaction("relayer-1");
431                tx.id = tx_id;
432                tx.status = update.status.unwrap();
433                tx.status_reason = update.status_reason;
434                tx.confirmed_at = update.confirmed_at;
435                Ok::<_, RepositoryError>(tx)
436            });
437
438        // Mock notification
439        mocks
440            .job_producer
441            .expect_produce_send_notification_job()
442            .times(1)
443            .returning(|_, _| Box::pin(async { Ok(()) }));
444
445        let handler = make_stellar_tx_handler(relayer, mocks);
446
447        let update_request = TransactionUpdateRequest {
448            status: Some(TransactionStatus::Confirmed),
449            status_reason: Some("Transaction confirmed".to_string()),
450            confirmed_at: Some("2023-01-01T00:00:00Z".to_string()),
451            ..Default::default()
452        };
453
454        let result = handler
455            .finalize_transaction_state("tx-1".to_string(), update_request)
456            .await;
457
458        assert!(result.is_ok());
459        let updated_tx = result.unwrap();
460        assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
461        assert_eq!(
462            updated_tx.status_reason,
463            Some("Transaction confirmed".to_string())
464        );
465    }
466
467    #[tokio::test]
468    async fn enqueue_next_pending_transaction_with_pending_tx() {
469        let relayer = create_test_relayer();
470        let mut mocks = default_test_mocks();
471
472        // Mock finding a pending transaction
473        let mut pending_tx = create_test_transaction(&relayer.id);
474        pending_tx.id = "pending-tx-1".to_string();
475
476        mocks
477            .tx_repo
478            .expect_find_by_status()
479            .withf(|relayer_id, statuses| {
480                relayer_id == "relayer-1" && statuses == [TransactionStatus::Pending]
481            })
482            .times(1)
483            .returning(move |_, _| {
484                let mut tx = create_test_transaction("relayer-1");
485                tx.id = "pending-tx-1".to_string();
486                Ok(vec![tx])
487            });
488
489        // Mock job production for the next transaction
490        mocks
491            .job_producer
492            .expect_produce_transaction_request_job()
493            .withf(|job, delay| job.transaction_id == "pending-tx-1" && delay.is_none())
494            .times(1)
495            .returning(|_, _| Box::pin(async { Ok(()) }));
496
497        let handler = make_stellar_tx_handler(relayer, mocks);
498
499        let result = handler
500            .enqueue_next_pending_transaction("finished-tx")
501            .await;
502        assert!(result.is_ok());
503    }
504
505    #[tokio::test]
506    async fn enqueue_next_pending_transaction_no_pending_tx() {
507        let relayer = create_test_relayer();
508        let mut mocks = default_test_mocks();
509
510        // Mock finding no pending transactions
511        mocks
512            .tx_repo
513            .expect_find_by_status()
514            .times(1)
515            .returning(|_, _| Ok(vec![]));
516
517        let handler = make_stellar_tx_handler(relayer, mocks);
518
519        let result = handler
520            .enqueue_next_pending_transaction("finished-tx")
521            .await;
522        assert!(result.is_ok());
523    }
524
525    #[tokio::test]
526    async fn test_sync_sequence_from_chain() {
527        let relayer = create_test_relayer();
528        let mut mocks = default_test_mocks();
529
530        // Mock provider to return account with sequence 100
531        mocks
532            .provider
533            .expect_get_account()
534            .withf(|addr| addr == TEST_PK)
535            .times(1)
536            .returning(|_| {
537                Box::pin(async {
538                    use soroban_rs::xdr::{
539                        AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
540                        String32, Thresholds, Uint256,
541                    };
542                    use stellar_strkey::ed25519;
543
544                    // Create a dummy public key for account ID
545                    let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
546                    let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
547
548                    Ok(AccountEntry {
549                        account_id,
550                        balance: 1000000,
551                        seq_num: SequenceNumber(100),
552                        num_sub_entries: 0,
553                        inflation_dest: None,
554                        flags: 0,
555                        home_domain: String32::default(),
556                        thresholds: Thresholds([1, 1, 1, 1]),
557                        signers: Default::default(),
558                        ext: AccountEntryExt::V0,
559                    })
560                })
561            });
562
563        // Mock counter set to verify it's called with next usable sequence (101)
564        mocks
565            .counter
566            .expect_set()
567            .withf(|relayer_id, addr, seq| {
568                relayer_id == "relayer-1" && addr == TEST_PK && *seq == 101
569            })
570            .times(1)
571            .returning(|_, _, _| Box::pin(async { Ok(()) }));
572
573        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
574
575        let result = handler.sync_sequence_from_chain(&relayer.address).await;
576        assert!(result.is_ok());
577    }
578
579    #[tokio::test]
580    async fn test_sync_sequence_from_chain_provider_error() {
581        let relayer = create_test_relayer();
582        let mut mocks = default_test_mocks();
583
584        // Mock provider to fail
585        mocks
586            .provider
587            .expect_get_account()
588            .times(1)
589            .returning(|_| Box::pin(async { Err(eyre::eyre!("Account not found")) }));
590
591        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
592
593        let result = handler.sync_sequence_from_chain(&relayer.address).await;
594        assert!(result.is_err());
595        match result.unwrap_err() {
596            TransactionError::UnexpectedError(msg) => {
597                assert!(msg.contains("Failed to fetch account from chain"));
598            }
599            _ => panic!("Expected UnexpectedError"),
600        }
601    }
602
603    #[tokio::test]
604    async fn test_sync_sequence_from_chain_counter_error() {
605        let relayer = create_test_relayer();
606        let mut mocks = default_test_mocks();
607
608        // Mock provider success
609        mocks.provider.expect_get_account().times(1).returning(|_| {
610            Box::pin(async {
611                use soroban_rs::xdr::{
612                    AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber, String32,
613                    Thresholds, Uint256,
614                };
615                use stellar_strkey::ed25519;
616
617                // Create a dummy public key for account ID
618                let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
619                let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
620
621                Ok(AccountEntry {
622                    account_id,
623                    balance: 1000000,
624                    seq_num: SequenceNumber(100),
625                    num_sub_entries: 0,
626                    inflation_dest: None,
627                    flags: 0,
628                    home_domain: String32::default(),
629                    thresholds: Thresholds([1, 1, 1, 1]),
630                    signers: Default::default(),
631                    ext: AccountEntryExt::V0,
632                })
633            })
634        });
635
636        // Mock counter set to fail
637        mocks.counter.expect_set().times(1).returning(|_, _, _| {
638            Box::pin(async {
639                Err(RepositoryError::Unknown(
640                    "Counter update failed".to_string(),
641                ))
642            })
643        });
644
645        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
646
647        let result = handler.sync_sequence_from_chain(&relayer.address).await;
648        assert!(result.is_err());
649        match result.unwrap_err() {
650            TransactionError::UnexpectedError(msg) => {
651                assert!(msg.contains("Failed to update sequence counter"));
652            }
653            _ => panic!("Expected UnexpectedError"),
654        }
655    }
656
657    #[tokio::test]
658    async fn test_reset_transaction_for_retry() {
659        let relayer = create_test_relayer();
660        let mut mocks = default_test_mocks();
661
662        // Create a transaction with stellar data that has been prepared
663        let mut tx = create_test_transaction(&relayer.id);
664        if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
665            data.sequence_number = Some(42);
666            data.signatures.push(dummy_signature());
667            data.hash = Some("test-hash".to_string());
668            data.signed_envelope_xdr = Some("test-xdr".to_string());
669        }
670
671        // Mock partial_update to reset transaction
672        mocks
673            .tx_repo
674            .expect_partial_update()
675            .withf(|tx_id, upd| {
676                tx_id == "tx-1"
677                    && upd.status == Some(TransactionStatus::Pending)
678                    && upd.sent_at.is_none()
679                    && upd.confirmed_at.is_none()
680            })
681            .times(1)
682            .returning(|id, upd| {
683                let mut tx = create_test_transaction("relayer-1");
684                tx.id = id;
685                tx.status = upd.status.unwrap();
686                if let Some(network_data) = upd.network_data {
687                    tx.network_data = network_data;
688                }
689                Ok::<_, RepositoryError>(tx)
690            });
691
692        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
693
694        let result = handler.reset_transaction_for_retry(tx).await;
695        assert!(result.is_ok());
696
697        let reset_tx = result.unwrap();
698        assert_eq!(reset_tx.status, TransactionStatus::Pending);
699
700        // Verify stellar data was reset
701        if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
702            assert!(data.sequence_number.is_none());
703            assert!(data.signatures.is_empty());
704            assert!(data.hash.is_none());
705            assert!(data.signed_envelope_xdr.is_none());
706        } else {
707            panic!("Expected Stellar transaction data");
708        }
709    }
710}