openzeppelin_relayer/domain/transaction/stellar/
submit.rs1use chrono::Utc;
6use log::{info, warn};
7
8use super::{utils::is_bad_sequence_error, StellarRelayerTransaction};
9use crate::{
10 constants::{STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS, STELLAR_STATUS_CHECK_JOB_DELAY_SECONDS},
11 jobs::{JobProducerTrait, TransactionStatusCheck},
12 models::{
13 NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
14 TransactionStatus, TransactionUpdateRequest,
15 },
16 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
17 services::{Signer, StellarProviderTrait},
18};
19
20impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
21where
22 R: Repository<RelayerRepoModel, String> + Send + Sync,
23 T: TransactionRepository + Send + Sync,
24 J: JobProducerTrait + Send + Sync,
25 S: Signer + Send + Sync,
26 P: StellarProviderTrait + Send + Sync,
27 C: TransactionCounterTrait + Send + Sync,
28{
29 pub async fn submit_transaction_impl(
32 &self,
33 tx: TransactionRepoModel,
34 ) -> Result<TransactionRepoModel, TransactionError> {
35 info!("Submitting Stellar transaction: {:?}", tx.id);
36
37 match self.submit_core(tx.clone()).await {
39 Ok(submitted_tx) => Ok(submitted_tx),
40 Err(error) => {
41 self.handle_submit_failure(tx, error).await
43 }
44 }
45 }
46
47 async fn submit_core(
49 &self,
50 tx: TransactionRepoModel,
51 ) -> Result<TransactionRepoModel, TransactionError> {
52 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
53 let tx_envelope = stellar_data
54 .get_envelope_for_submission()
55 .map_err(TransactionError::from)?;
56
57 let hash = self
58 .provider()
59 .send_transaction(&tx_envelope)
60 .await
61 .map_err(TransactionError::from)?;
62
63 let tx_hash_hex = hex::encode(hash.as_slice());
64 let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
65
66 let mut hashes = tx.hashes.clone();
67 hashes.push(tx_hash_hex);
68
69 let update_req = TransactionUpdateRequest {
70 status: Some(TransactionStatus::Submitted),
71 sent_at: Some(Utc::now().to_rfc3339()),
72 network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
73 hashes: Some(hashes),
74 ..Default::default()
75 };
76
77 let updated_tx = self
78 .transaction_repository()
79 .partial_update(tx.id.clone(), update_req)
80 .await?;
81
82 self.job_producer()
84 .produce_check_transaction_status_job(
85 TransactionStatusCheck::new(updated_tx.id.clone(), updated_tx.relayer_id.clone()),
86 Some(STELLAR_STATUS_CHECK_JOB_DELAY_SECONDS),
87 )
88 .await?;
89
90 self.send_transaction_update_notification(&updated_tx)
92 .await?;
93
94 Ok(updated_tx)
95 }
96
97 async fn handle_submit_failure(
100 &self,
101 tx: TransactionRepoModel,
102 error: TransactionError,
103 ) -> Result<TransactionRepoModel, TransactionError> {
104 let error_reason = format!("Submission failed: {}", error);
105 let tx_id = tx.id.clone();
106 warn!("Transaction {} submission failed: {}", tx_id, error_reason);
107
108 if is_bad_sequence_error(&error_reason) {
109 if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
111 info!(
112 "Syncing sequence from chain after bad sequence error for transaction {}",
113 tx_id
114 );
115 match self
116 .sync_sequence_from_chain(&stellar_data.source_account)
117 .await
118 {
119 Ok(()) => {
120 info!(
121 "Successfully synced sequence from chain for transaction {}",
122 tx_id
123 );
124 }
125 Err(sync_error) => {
126 warn!(
127 "Failed to sync sequence from chain for transaction {}: {}",
128 tx_id, sync_error
129 );
130 }
131 }
132 }
133
134 info!(
136 "Bad sequence error detected for transaction {}. Resetting and re-enqueueing.",
137 tx_id
138 );
139
140 match self.reset_transaction_for_retry(tx.clone()).await {
142 Ok(reset_tx) => {
143 if let Err(e) = self
145 .send_transaction_request_job(
146 &reset_tx,
147 Some(STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS),
148 )
149 .await
150 {
151 warn!(
152 "Failed to re-enqueue transaction {} after reset: {}",
153 tx_id, e
154 );
155 } else {
156 info!(
157 "Transaction {} reset and re-enqueued for retry through pipeline",
158 tx_id
159 );
160 }
161
162 return Ok(reset_tx);
164 }
165 Err(reset_error) => {
166 warn!(
167 "Failed to reset transaction {} for retry: {}",
168 tx_id, reset_error
169 );
170 }
172 }
173 }
174
175 let update_request = TransactionUpdateRequest {
178 status: Some(TransactionStatus::Failed),
179 status_reason: Some(error_reason.clone()),
180 ..Default::default()
181 };
182 let _failed_tx = match self
183 .finalize_transaction_state(tx_id.clone(), update_request)
184 .await
185 {
186 Ok(updated_tx) => updated_tx,
187 Err(finalize_error) => {
188 warn!(
189 "Failed to mark transaction {} as failed: {}. Continuing with lane cleanup.",
190 tx_id, finalize_error
191 );
192 tx
193 }
194 };
195
196 if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
198 warn!(
199 "Failed to enqueue next pending transaction after {} submission failure: {}.",
200 tx_id, enqueue_error
201 );
202 }
203
204 info!(
205 "Transaction {} submission failure handled. Error: {}",
206 tx_id, error_reason
207 );
208
209 Err(error)
210 }
211
212 pub async fn resubmit_transaction_impl(
214 &self,
215 tx: TransactionRepoModel,
216 ) -> Result<TransactionRepoModel, TransactionError> {
217 self.submit_transaction_impl(tx).await
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use soroban_rs::xdr::{Hash, WriteXdr};
225
226 use crate::domain::transaction::stellar::test_helpers::*;
227
228 mod submit_transaction_tests {
229 use crate::models::RepositoryError;
230
231 use super::*;
232
233 #[tokio::test]
234 async fn submit_transaction_happy_path() {
235 let relayer = create_test_relayer();
236 let mut mocks = default_test_mocks();
237
238 mocks
240 .provider
241 .expect_send_transaction()
242 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
243
244 mocks
246 .tx_repo
247 .expect_partial_update()
248 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
249 .returning(|id, upd| {
250 let mut tx = create_test_transaction("relayer-1");
251 tx.id = id;
252 tx.status = upd.status.unwrap();
253 Ok::<_, RepositoryError>(tx)
254 });
255
256 mocks
258 .job_producer
259 .expect_produce_check_transaction_status_job()
260 .times(1)
261 .returning(|_, _| Box::pin(async { Ok(()) }));
262 mocks
263 .job_producer
264 .expect_produce_send_notification_job()
265 .times(1)
266 .returning(|_, _| Box::pin(async { Ok(()) }));
267
268 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
269
270 let mut tx = create_test_transaction(&relayer.id);
271 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
272 d.signatures.push(dummy_signature());
273 }
274
275 let res = handler.submit_transaction_impl(tx).await.unwrap();
276 assert_eq!(res.status, TransactionStatus::Submitted);
277 }
278
279 #[tokio::test]
280 async fn submit_transaction_provider_error_marks_failed() {
281 let relayer = create_test_relayer();
282 let mut mocks = default_test_mocks();
283
284 mocks
286 .provider
287 .expect_send_transaction()
288 .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
289
290 mocks
292 .tx_repo
293 .expect_partial_update()
294 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
295 .returning(|id, upd| {
296 let mut tx = create_test_transaction("relayer-1");
297 tx.id = id;
298 tx.status = upd.status.unwrap();
299 Ok::<_, RepositoryError>(tx)
300 });
301
302 mocks
304 .job_producer
305 .expect_produce_send_notification_job()
306 .times(1)
307 .returning(|_, _| Box::pin(async { Ok(()) }));
308
309 mocks
311 .tx_repo
312 .expect_find_by_status()
313 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
316 let mut tx = create_test_transaction(&relayer.id);
317 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
318 data.signatures.push(dummy_signature());
319 data.sequence_number = Some(42); }
321
322 let res = handler.submit_transaction_impl(tx).await;
323
324 assert!(res.is_err());
326 matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
327 }
328
329 #[tokio::test]
330 async fn submit_transaction_repository_error_marks_failed() {
331 let relayer = create_test_relayer();
332 let mut mocks = default_test_mocks();
333
334 mocks
336 .provider
337 .expect_send_transaction()
338 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
339
340 mocks
342 .tx_repo
343 .expect_partial_update()
344 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
345 .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
346
347 mocks
349 .tx_repo
350 .expect_partial_update()
351 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
352 .returning(|id, upd| {
353 let mut tx = create_test_transaction("relayer-1");
354 tx.id = id;
355 tx.status = upd.status.unwrap();
356 Ok::<_, RepositoryError>(tx)
357 });
358
359 mocks
361 .job_producer
362 .expect_produce_send_notification_job()
363 .times(1)
364 .returning(|_, _| Box::pin(async { Ok(()) }));
365
366 mocks
368 .tx_repo
369 .expect_find_by_status()
370 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
373 let mut tx = create_test_transaction(&relayer.id);
374 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
375 data.signatures.push(dummy_signature());
376 data.sequence_number = Some(42); }
378
379 let res = handler.submit_transaction_impl(tx).await;
380
381 assert!(res.is_err());
383 }
384
385 #[tokio::test]
386 async fn submit_transaction_uses_signed_envelope_xdr() {
387 let relayer = create_test_relayer();
388 let mut mocks = default_test_mocks();
389
390 let mut tx = create_test_transaction(&relayer.id);
392 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
393 data.signatures.push(dummy_signature());
394 let envelope = data.get_envelope_for_submission().unwrap();
396 let xdr = envelope
397 .to_xdr_base64(soroban_rs::xdr::Limits::none())
398 .unwrap();
399 data.signed_envelope_xdr = Some(xdr);
400 }
401
402 mocks
404 .provider
405 .expect_send_transaction()
406 .returning(|_| Box::pin(async { Ok(Hash([2u8; 32])) }));
407
408 mocks
410 .tx_repo
411 .expect_partial_update()
412 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
413 .returning(|id, upd| {
414 let mut tx = create_test_transaction("relayer-1");
415 tx.id = id;
416 tx.status = upd.status.unwrap();
417 Ok::<_, RepositoryError>(tx)
418 });
419
420 mocks
422 .job_producer
423 .expect_produce_check_transaction_status_job()
424 .times(1)
425 .returning(|_, _| Box::pin(async { Ok(()) }));
426 mocks
427 .job_producer
428 .expect_produce_send_notification_job()
429 .times(1)
430 .returning(|_, _| Box::pin(async { Ok(()) }));
431
432 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
433 let res = handler.submit_transaction_impl(tx).await.unwrap();
434
435 assert_eq!(res.status, TransactionStatus::Submitted);
436 }
437
438 #[tokio::test]
439 async fn resubmit_transaction_delegates_to_submit() {
440 let relayer = create_test_relayer();
441 let mut mocks = default_test_mocks();
442
443 mocks
445 .provider
446 .expect_send_transaction()
447 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
448
449 mocks
451 .tx_repo
452 .expect_partial_update()
453 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
454 .returning(|id, upd| {
455 let mut tx = create_test_transaction("relayer-1");
456 tx.id = id;
457 tx.status = upd.status.unwrap();
458 Ok::<_, RepositoryError>(tx)
459 });
460
461 mocks
463 .job_producer
464 .expect_produce_check_transaction_status_job()
465 .times(1)
466 .returning(|_, _| Box::pin(async { Ok(()) }));
467 mocks
468 .job_producer
469 .expect_produce_send_notification_job()
470 .times(1)
471 .returning(|_, _| Box::pin(async { Ok(()) }));
472
473 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
474
475 let mut tx = create_test_transaction(&relayer.id);
476 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
477 d.signatures.push(dummy_signature());
478 }
479
480 let res = handler.resubmit_transaction_impl(tx).await.unwrap();
481 assert_eq!(res.status, TransactionStatus::Submitted);
482 }
483
484 #[tokio::test]
485 async fn submit_transaction_failure_enqueues_next_transaction() {
486 let relayer = create_test_relayer();
487 let mut mocks = default_test_mocks();
488
489 mocks
491 .provider
492 .expect_send_transaction()
493 .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
494
495 mocks
499 .tx_repo
500 .expect_partial_update()
501 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
502 .returning(|id, upd| {
503 let mut tx = create_test_transaction("relayer-1");
504 tx.id = id;
505 tx.status = upd.status.unwrap();
506 Ok::<_, RepositoryError>(tx)
507 });
508
509 mocks
511 .job_producer
512 .expect_produce_send_notification_job()
513 .times(1)
514 .returning(|_, _| Box::pin(async { Ok(()) }));
515
516 let mut pending_tx = create_test_transaction(&relayer.id);
518 pending_tx.id = "next-pending-tx".to_string();
519 pending_tx.status = TransactionStatus::Pending;
520 let captured_pending_tx = pending_tx.clone();
521 mocks
522 .tx_repo
523 .expect_find_by_status()
524 .with(
525 mockall::predicate::eq(relayer.id.clone()),
526 mockall::predicate::eq(vec![TransactionStatus::Pending]),
527 )
528 .times(1)
529 .returning(move |_, _| Ok(vec![captured_pending_tx.clone()]));
530
531 mocks
533 .job_producer
534 .expect_produce_transaction_request_job()
535 .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
536 .times(1)
537 .returning(|_, _| Box::pin(async { Ok(()) }));
538
539 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
540 let mut tx = create_test_transaction(&relayer.id);
541 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
542 data.signatures.push(dummy_signature());
543 data.sequence_number = Some(42); }
545
546 let res = handler.submit_transaction_impl(tx).await;
547
548 assert!(res.is_err());
550 matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
551 }
552
553 #[tokio::test]
554 async fn test_submit_bad_sequence_resets_and_retries() {
555 let relayer = create_test_relayer();
556 let mut mocks = default_test_mocks();
557
558 mocks.provider.expect_send_transaction().returning(|_| {
560 Box::pin(async { Err(eyre::eyre!("transaction submission failed: TxBadSeq")) })
561 });
562
563 mocks.provider.expect_get_account().times(1).returning(|_| {
565 Box::pin(async {
566 use soroban_rs::xdr::{
567 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
568 String32, Thresholds, Uint256,
569 };
570 use stellar_strkey::ed25519;
571
572 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
573 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
574
575 Ok(AccountEntry {
576 account_id,
577 balance: 1000000,
578 seq_num: SequenceNumber(100),
579 num_sub_entries: 0,
580 inflation_dest: None,
581 flags: 0,
582 home_domain: String32::default(),
583 thresholds: Thresholds([1, 1, 1, 1]),
584 signers: Default::default(),
585 ext: AccountEntryExt::V0,
586 })
587 })
588 });
589
590 mocks
592 .counter
593 .expect_set()
594 .times(1)
595 .returning(|_, _, _| Box::pin(async { Ok(()) }));
596
597 mocks
599 .tx_repo
600 .expect_partial_update()
601 .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
602 .times(1)
603 .returning(|id, upd| {
604 let mut tx = create_test_transaction("relayer-1");
605 tx.id = id;
606 tx.status = upd.status.unwrap();
607 if let Some(network_data) = upd.network_data {
608 tx.network_data = network_data;
609 }
610 Ok::<_, RepositoryError>(tx)
611 });
612
613 mocks
615 .job_producer
616 .expect_produce_transaction_request_job()
617 .times(1)
618 .returning(|_, _| Box::pin(async { Ok(()) }));
619
620 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
621 let mut tx = create_test_transaction(&relayer.id);
622 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
623 data.signatures.push(dummy_signature());
624 data.sequence_number = Some(42);
625 }
626
627 let result = handler.submit_transaction_impl(tx).await;
628
629 assert!(result.is_ok());
631 let reset_tx = result.unwrap();
632 assert_eq!(reset_tx.status, TransactionStatus::Pending);
633
634 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
636 assert!(data.sequence_number.is_none());
637 assert!(data.signatures.is_empty());
638 assert!(data.hash.is_none());
639 assert!(data.signed_envelope_xdr.is_none());
640 } else {
641 panic!("Expected Stellar transaction data");
642 }
643 }
644 }
645}