openzeppelin_relayer/domain/transaction/stellar/
submit.rs

1//! This module contains the submission-related functionality for Stellar transactions.
2//! It includes methods for submitting transactions with robust error handling,
3//! ensuring proper transaction state management on failure.
4
5use chrono::Utc;
6use log::{info, warn};
7
8use super::{utils::is_bad_sequence_error, StellarRelayerTransaction};
9use crate::{
10    constants::{STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS, STELLAR_STATUS_CHECK_JOB_DELAY_SECONDS},
11    jobs::{JobProducerTrait, TransactionStatusCheck},
12    models::{
13        NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
14        TransactionStatus, TransactionUpdateRequest,
15    },
16    repositories::{Repository, TransactionCounterTrait, TransactionRepository},
17    services::{Signer, StellarProviderTrait},
18};
19
20impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
21where
22    R: Repository<RelayerRepoModel, String> + Send + Sync,
23    T: TransactionRepository + Send + Sync,
24    J: JobProducerTrait + Send + Sync,
25    S: Signer + Send + Sync,
26    P: StellarProviderTrait + Send + Sync,
27    C: TransactionCounterTrait + Send + Sync,
28{
29    /// Main submission method with robust error handling.
30    /// Unlike prepare, submit doesn't claim lanes but still needs proper error handling.
31    pub async fn submit_transaction_impl(
32        &self,
33        tx: TransactionRepoModel,
34    ) -> Result<TransactionRepoModel, TransactionError> {
35        info!("Submitting Stellar transaction: {:?}", tx.id);
36
37        // Call core submission logic with error handling
38        match self.submit_core(tx.clone()).await {
39            Ok(submitted_tx) => Ok(submitted_tx),
40            Err(error) => {
41                // Handle submission failure - mark as failed and send notification
42                self.handle_submit_failure(tx, error).await
43            }
44        }
45    }
46
47    /// Core submission logic - pure business logic without error handling concerns.
48    async fn submit_core(
49        &self,
50        tx: TransactionRepoModel,
51    ) -> Result<TransactionRepoModel, TransactionError> {
52        let stellar_data = tx.network_data.get_stellar_transaction_data()?;
53        let tx_envelope = stellar_data
54            .get_envelope_for_submission()
55            .map_err(TransactionError::from)?;
56
57        let hash = self
58            .provider()
59            .send_transaction(&tx_envelope)
60            .await
61            .map_err(TransactionError::from)?;
62
63        let tx_hash_hex = hex::encode(hash.as_slice());
64        let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
65
66        let mut hashes = tx.hashes.clone();
67        hashes.push(tx_hash_hex);
68
69        let update_req = TransactionUpdateRequest {
70            status: Some(TransactionStatus::Submitted),
71            sent_at: Some(Utc::now().to_rfc3339()),
72            network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
73            hashes: Some(hashes),
74            ..Default::default()
75        };
76
77        let updated_tx = self
78            .transaction_repository()
79            .partial_update(tx.id.clone(), update_req)
80            .await?;
81
82        // Enqueue status check job
83        self.job_producer()
84            .produce_check_transaction_status_job(
85                TransactionStatusCheck::new(updated_tx.id.clone(), updated_tx.relayer_id.clone()),
86                Some(STELLAR_STATUS_CHECK_JOB_DELAY_SECONDS),
87            )
88            .await?;
89
90        // Send notification
91        self.send_transaction_update_notification(&updated_tx)
92            .await?;
93
94        Ok(updated_tx)
95    }
96
97    /// Handles submission failures with comprehensive cleanup and error reporting.
98    /// For bad sequence errors, resets the transaction and re-enqueues it for retry.
99    async fn handle_submit_failure(
100        &self,
101        tx: TransactionRepoModel,
102        error: TransactionError,
103    ) -> Result<TransactionRepoModel, TransactionError> {
104        let error_reason = format!("Submission failed: {}", error);
105        let tx_id = tx.id.clone();
106        warn!("Transaction {} submission failed: {}", tx_id, error_reason);
107
108        if is_bad_sequence_error(&error_reason) {
109            // For bad sequence errors, sync sequence from chain first
110            if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
111                info!(
112                    "Syncing sequence from chain after bad sequence error for transaction {}",
113                    tx_id
114                );
115                match self
116                    .sync_sequence_from_chain(&stellar_data.source_account)
117                    .await
118                {
119                    Ok(()) => {
120                        info!(
121                            "Successfully synced sequence from chain for transaction {}",
122                            tx_id
123                        );
124                    }
125                    Err(sync_error) => {
126                        warn!(
127                            "Failed to sync sequence from chain for transaction {}: {}",
128                            tx_id, sync_error
129                        );
130                    }
131                }
132            }
133
134            // Reset the transaction and re-enqueue it
135            info!(
136                "Bad sequence error detected for transaction {}. Resetting and re-enqueueing.",
137                tx_id
138            );
139
140            // Reset the transaction to pending state
141            match self.reset_transaction_for_retry(tx.clone()).await {
142                Ok(reset_tx) => {
143                    // Re-enqueue the transaction to go through the pipeline again
144                    if let Err(e) = self
145                        .send_transaction_request_job(
146                            &reset_tx,
147                            Some(STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS),
148                        )
149                        .await
150                    {
151                        warn!(
152                            "Failed to re-enqueue transaction {} after reset: {}",
153                            tx_id, e
154                        );
155                    } else {
156                        info!(
157                            "Transaction {} reset and re-enqueued for retry through pipeline",
158                            tx_id
159                        );
160                    }
161
162                    // Return success since we're handling the retry
163                    return Ok(reset_tx);
164                }
165                Err(reset_error) => {
166                    warn!(
167                        "Failed to reset transaction {} for retry: {}",
168                        tx_id, reset_error
169                    );
170                    // Fall through to normal failure handling
171                }
172            }
173        }
174
175        // For non-bad-sequence errors or if reset failed, mark as failed
176        // Step 1: Mark transaction as Failed with detailed reason
177        let update_request = TransactionUpdateRequest {
178            status: Some(TransactionStatus::Failed),
179            status_reason: Some(error_reason.clone()),
180            ..Default::default()
181        };
182        let _failed_tx = match self
183            .finalize_transaction_state(tx_id.clone(), update_request)
184            .await
185        {
186            Ok(updated_tx) => updated_tx,
187            Err(finalize_error) => {
188                warn!(
189                    "Failed to mark transaction {} as failed: {}. Continuing with lane cleanup.",
190                    tx_id, finalize_error
191                );
192                tx
193            }
194        };
195
196        // Attempt to enqueue next pending transaction or release lane
197        if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
198            warn!(
199                "Failed to enqueue next pending transaction after {} submission failure: {}.",
200                tx_id, enqueue_error
201            );
202        }
203
204        info!(
205            "Transaction {} submission failure handled. Error: {}",
206            tx_id, error_reason
207        );
208
209        Err(error)
210    }
211
212    /// Resubmit transaction - delegates to submit_transaction_impl
213    pub async fn resubmit_transaction_impl(
214        &self,
215        tx: TransactionRepoModel,
216    ) -> Result<TransactionRepoModel, TransactionError> {
217        self.submit_transaction_impl(tx).await
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use soroban_rs::xdr::{Hash, WriteXdr};
225
226    use crate::domain::transaction::stellar::test_helpers::*;
227
228    mod submit_transaction_tests {
229        use crate::models::RepositoryError;
230
231        use super::*;
232
233        #[tokio::test]
234        async fn submit_transaction_happy_path() {
235            let relayer = create_test_relayer();
236            let mut mocks = default_test_mocks();
237
238            // provider gives a hash
239            mocks
240                .provider
241                .expect_send_transaction()
242                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
243
244            // expect partial update to Submitted
245            mocks
246                .tx_repo
247                .expect_partial_update()
248                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
249                .returning(|id, upd| {
250                    let mut tx = create_test_transaction("relayer-1");
251                    tx.id = id;
252                    tx.status = upd.status.unwrap();
253                    Ok::<_, RepositoryError>(tx)
254                });
255
256            // enqueue status-check & notification
257            mocks
258                .job_producer
259                .expect_produce_check_transaction_status_job()
260                .times(1)
261                .returning(|_, _| Box::pin(async { Ok(()) }));
262            mocks
263                .job_producer
264                .expect_produce_send_notification_job()
265                .times(1)
266                .returning(|_, _| Box::pin(async { Ok(()) }));
267
268            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
269
270            let mut tx = create_test_transaction(&relayer.id);
271            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
272                d.signatures.push(dummy_signature());
273            }
274
275            let res = handler.submit_transaction_impl(tx).await.unwrap();
276            assert_eq!(res.status, TransactionStatus::Submitted);
277        }
278
279        #[tokio::test]
280        async fn submit_transaction_provider_error_marks_failed() {
281            let relayer = create_test_relayer();
282            let mut mocks = default_test_mocks();
283
284            // Provider fails with non-bad-sequence error
285            mocks
286                .provider
287                .expect_send_transaction()
288                .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
289
290            // Mock finalize_transaction_state for failure handling
291            mocks
292                .tx_repo
293                .expect_partial_update()
294                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
295                .returning(|id, upd| {
296                    let mut tx = create_test_transaction("relayer-1");
297                    tx.id = id;
298                    tx.status = upd.status.unwrap();
299                    Ok::<_, RepositoryError>(tx)
300                });
301
302            // Mock notification for failed transaction
303            mocks
304                .job_producer
305                .expect_produce_send_notification_job()
306                .times(1)
307                .returning(|_, _| Box::pin(async { Ok(()) }));
308
309            // Mock find_by_status for enqueue_next_pending_transaction
310            mocks
311                .tx_repo
312                .expect_find_by_status()
313                .returning(|_, _| Ok(vec![])); // No pending transactions
314
315            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
316            let mut tx = create_test_transaction(&relayer.id);
317            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
318                data.signatures.push(dummy_signature());
319                data.sequence_number = Some(42); // Set sequence number
320            }
321
322            let res = handler.submit_transaction_impl(tx).await;
323
324            // Should return error but transaction should be marked as failed
325            assert!(res.is_err());
326            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
327        }
328
329        #[tokio::test]
330        async fn submit_transaction_repository_error_marks_failed() {
331            let relayer = create_test_relayer();
332            let mut mocks = default_test_mocks();
333
334            // Provider succeeds
335            mocks
336                .provider
337                .expect_send_transaction()
338                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
339
340            // Repository fails on first update (submission)
341            mocks
342                .tx_repo
343                .expect_partial_update()
344                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
345                .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
346
347            // Mock finalize_transaction_state for failure handling
348            mocks
349                .tx_repo
350                .expect_partial_update()
351                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
352                .returning(|id, upd| {
353                    let mut tx = create_test_transaction("relayer-1");
354                    tx.id = id;
355                    tx.status = upd.status.unwrap();
356                    Ok::<_, RepositoryError>(tx)
357                });
358
359            // Mock notification for failed transaction
360            mocks
361                .job_producer
362                .expect_produce_send_notification_job()
363                .times(1)
364                .returning(|_, _| Box::pin(async { Ok(()) }));
365
366            // Mock find_by_status for enqueue_next_pending_transaction
367            mocks
368                .tx_repo
369                .expect_find_by_status()
370                .returning(|_, _| Ok(vec![])); // No pending transactions
371
372            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
373            let mut tx = create_test_transaction(&relayer.id);
374            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
375                data.signatures.push(dummy_signature());
376                data.sequence_number = Some(42); // Set sequence number
377            }
378
379            let res = handler.submit_transaction_impl(tx).await;
380
381            // Should return error but transaction should be marked as failed
382            assert!(res.is_err());
383        }
384
385        #[tokio::test]
386        async fn submit_transaction_uses_signed_envelope_xdr() {
387            let relayer = create_test_relayer();
388            let mut mocks = default_test_mocks();
389
390            // Create a transaction with signed_envelope_xdr set
391            let mut tx = create_test_transaction(&relayer.id);
392            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
393                data.signatures.push(dummy_signature());
394                // Build and store the signed envelope XDR
395                let envelope = data.get_envelope_for_submission().unwrap();
396                let xdr = envelope
397                    .to_xdr_base64(soroban_rs::xdr::Limits::none())
398                    .unwrap();
399                data.signed_envelope_xdr = Some(xdr);
400            }
401
402            // Provider should receive the envelope decoded from signed_envelope_xdr
403            mocks
404                .provider
405                .expect_send_transaction()
406                .returning(|_| Box::pin(async { Ok(Hash([2u8; 32])) }));
407
408            // Update to Submitted
409            mocks
410                .tx_repo
411                .expect_partial_update()
412                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
413                .returning(|id, upd| {
414                    let mut tx = create_test_transaction("relayer-1");
415                    tx.id = id;
416                    tx.status = upd.status.unwrap();
417                    Ok::<_, RepositoryError>(tx)
418                });
419
420            // Job and notification expectations
421            mocks
422                .job_producer
423                .expect_produce_check_transaction_status_job()
424                .times(1)
425                .returning(|_, _| Box::pin(async { Ok(()) }));
426            mocks
427                .job_producer
428                .expect_produce_send_notification_job()
429                .times(1)
430                .returning(|_, _| Box::pin(async { Ok(()) }));
431
432            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
433            let res = handler.submit_transaction_impl(tx).await.unwrap();
434
435            assert_eq!(res.status, TransactionStatus::Submitted);
436        }
437
438        #[tokio::test]
439        async fn resubmit_transaction_delegates_to_submit() {
440            let relayer = create_test_relayer();
441            let mut mocks = default_test_mocks();
442
443            // provider gives a hash
444            mocks
445                .provider
446                .expect_send_transaction()
447                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
448
449            // expect partial update to Submitted
450            mocks
451                .tx_repo
452                .expect_partial_update()
453                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
454                .returning(|id, upd| {
455                    let mut tx = create_test_transaction("relayer-1");
456                    tx.id = id;
457                    tx.status = upd.status.unwrap();
458                    Ok::<_, RepositoryError>(tx)
459                });
460
461            // enqueue status-check & notification
462            mocks
463                .job_producer
464                .expect_produce_check_transaction_status_job()
465                .times(1)
466                .returning(|_, _| Box::pin(async { Ok(()) }));
467            mocks
468                .job_producer
469                .expect_produce_send_notification_job()
470                .times(1)
471                .returning(|_, _| Box::pin(async { Ok(()) }));
472
473            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
474
475            let mut tx = create_test_transaction(&relayer.id);
476            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
477                d.signatures.push(dummy_signature());
478            }
479
480            let res = handler.resubmit_transaction_impl(tx).await.unwrap();
481            assert_eq!(res.status, TransactionStatus::Submitted);
482        }
483
484        #[tokio::test]
485        async fn submit_transaction_failure_enqueues_next_transaction() {
486            let relayer = create_test_relayer();
487            let mut mocks = default_test_mocks();
488
489            // Provider fails with non-bad-sequence error
490            mocks
491                .provider
492                .expect_send_transaction()
493                .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
494
495            // No sync expected for non-bad-sequence errors
496
497            // Mock finalize_transaction_state for failure handling
498            mocks
499                .tx_repo
500                .expect_partial_update()
501                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
502                .returning(|id, upd| {
503                    let mut tx = create_test_transaction("relayer-1");
504                    tx.id = id;
505                    tx.status = upd.status.unwrap();
506                    Ok::<_, RepositoryError>(tx)
507                });
508
509            // Mock notification for failed transaction
510            mocks
511                .job_producer
512                .expect_produce_send_notification_job()
513                .times(1)
514                .returning(|_, _| Box::pin(async { Ok(()) }));
515
516            // Mock find_by_status to return a pending transaction
517            let mut pending_tx = create_test_transaction(&relayer.id);
518            pending_tx.id = "next-pending-tx".to_string();
519            pending_tx.status = TransactionStatus::Pending;
520            let captured_pending_tx = pending_tx.clone();
521            mocks
522                .tx_repo
523                .expect_find_by_status()
524                .with(
525                    mockall::predicate::eq(relayer.id.clone()),
526                    mockall::predicate::eq(vec![TransactionStatus::Pending]),
527                )
528                .times(1)
529                .returning(move |_, _| Ok(vec![captured_pending_tx.clone()]));
530
531            // Mock produce_transaction_request_job for the next pending transaction
532            mocks
533                .job_producer
534                .expect_produce_transaction_request_job()
535                .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
536                .times(1)
537                .returning(|_, _| Box::pin(async { Ok(()) }));
538
539            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
540            let mut tx = create_test_transaction(&relayer.id);
541            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
542                data.signatures.push(dummy_signature());
543                data.sequence_number = Some(42); // Set sequence number
544            }
545
546            let res = handler.submit_transaction_impl(tx).await;
547
548            // Should return error but next transaction should be enqueued
549            assert!(res.is_err());
550            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
551        }
552
553        #[tokio::test]
554        async fn test_submit_bad_sequence_resets_and_retries() {
555            let relayer = create_test_relayer();
556            let mut mocks = default_test_mocks();
557
558            // Mock provider to return bad sequence error
559            mocks.provider.expect_send_transaction().returning(|_| {
560                Box::pin(async { Err(eyre::eyre!("transaction submission failed: TxBadSeq")) })
561            });
562
563            // Mock get_account for sync_sequence_from_chain
564            mocks.provider.expect_get_account().times(1).returning(|_| {
565                Box::pin(async {
566                    use soroban_rs::xdr::{
567                        AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
568                        String32, Thresholds, Uint256,
569                    };
570                    use stellar_strkey::ed25519;
571
572                    let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
573                    let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
574
575                    Ok(AccountEntry {
576                        account_id,
577                        balance: 1000000,
578                        seq_num: SequenceNumber(100),
579                        num_sub_entries: 0,
580                        inflation_dest: None,
581                        flags: 0,
582                        home_domain: String32::default(),
583                        thresholds: Thresholds([1, 1, 1, 1]),
584                        signers: Default::default(),
585                        ext: AccountEntryExt::V0,
586                    })
587                })
588            });
589
590            // Mock counter set for sync_sequence_from_chain
591            mocks
592                .counter
593                .expect_set()
594                .times(1)
595                .returning(|_, _, _| Box::pin(async { Ok(()) }));
596
597            // Mock partial_update for reset_transaction_for_retry - should reset to Pending
598            mocks
599                .tx_repo
600                .expect_partial_update()
601                .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
602                .times(1)
603                .returning(|id, upd| {
604                    let mut tx = create_test_transaction("relayer-1");
605                    tx.id = id;
606                    tx.status = upd.status.unwrap();
607                    if let Some(network_data) = upd.network_data {
608                        tx.network_data = network_data;
609                    }
610                    Ok::<_, RepositoryError>(tx)
611                });
612
613            // Mock produce_transaction_request_job for re-enqueue
614            mocks
615                .job_producer
616                .expect_produce_transaction_request_job()
617                .times(1)
618                .returning(|_, _| Box::pin(async { Ok(()) }));
619
620            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
621            let mut tx = create_test_transaction(&relayer.id);
622            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
623                data.signatures.push(dummy_signature());
624                data.sequence_number = Some(42);
625            }
626
627            let result = handler.submit_transaction_impl(tx).await;
628
629            // Should return Ok since we're handling the retry
630            assert!(result.is_ok());
631            let reset_tx = result.unwrap();
632            assert_eq!(reset_tx.status, TransactionStatus::Pending);
633
634            // Verify stellar data was reset
635            if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
636                assert!(data.sequence_number.is_none());
637                assert!(data.signatures.is_empty());
638                assert!(data.hash.is_none());
639                assert!(data.signed_envelope_xdr.is_none());
640            } else {
641                panic!("Expected Stellar transaction data");
642            }
643        }
644    }
645}