1use crate::{
7 domain::transaction::{stellar::fetch_next_sequence_from_chain, Transaction},
8 jobs::{JobProducer, JobProducerTrait, TransactionRequest},
9 models::{
10 produce_transaction_update_notification_payload, NetworkTransactionRequest,
11 RelayerRepoModel, TransactionError, TransactionRepoModel, TransactionStatus,
12 TransactionUpdateRequest,
13 },
14 repositories::{
15 RelayerRepositoryStorage, Repository, TransactionCounterRepositoryStorage,
16 TransactionCounterTrait, TransactionRepository, TransactionRepositoryStorage,
17 },
18 services::{Signer, StellarProvider, StellarProviderTrait, StellarSigner},
19};
20use async_trait::async_trait;
21use eyre::Result;
22use log::info;
23use std::sync::Arc;
24
25use super::lane_gate;
26
27#[allow(dead_code)]
28pub struct StellarRelayerTransaction<R, T, J, S, P, C>
29where
30 R: Repository<RelayerRepoModel, String>,
31 T: TransactionRepository,
32 J: JobProducerTrait,
33 S: Signer,
34 P: StellarProviderTrait,
35 C: TransactionCounterTrait,
36{
37 relayer: RelayerRepoModel,
38 relayer_repository: Arc<R>,
39 transaction_repository: Arc<T>,
40 job_producer: Arc<J>,
41 signer: Arc<S>,
42 provider: P,
43 transaction_counter_service: Arc<C>,
44}
45
46#[allow(dead_code)]
47impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
48where
49 R: Repository<RelayerRepoModel, String>,
50 T: TransactionRepository,
51 J: JobProducerTrait,
52 S: Signer,
53 P: StellarProviderTrait,
54 C: TransactionCounterTrait,
55{
56 #[allow(clippy::too_many_arguments)]
72 pub fn new(
73 relayer: RelayerRepoModel,
74 relayer_repository: Arc<R>,
75 transaction_repository: Arc<T>,
76 job_producer: Arc<J>,
77 signer: Arc<S>,
78 provider: P,
79 transaction_counter_service: Arc<C>,
80 ) -> Result<Self, TransactionError> {
81 Ok(Self {
82 relayer,
83 relayer_repository,
84 transaction_repository,
85 job_producer,
86 signer,
87 provider,
88 transaction_counter_service,
89 })
90 }
91
92 pub fn provider(&self) -> &P {
93 &self.provider
94 }
95
96 pub fn relayer(&self) -> &RelayerRepoModel {
97 &self.relayer
98 }
99
100 pub fn job_producer(&self) -> &J {
101 &self.job_producer
102 }
103
104 pub fn transaction_repository(&self) -> &T {
105 &self.transaction_repository
106 }
107
108 pub fn signer(&self) -> &S {
109 &self.signer
110 }
111
112 pub fn transaction_counter_service(&self) -> &C {
113 &self.transaction_counter_service
114 }
115
116 pub async fn send_transaction_request_job(
118 &self,
119 tx: &TransactionRepoModel,
120 delay_seconds: Option<i64>,
121 ) -> Result<(), TransactionError> {
122 let job = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
123 self.job_producer()
124 .produce_transaction_request_job(job, delay_seconds)
125 .await?;
126 Ok(())
127 }
128
129 pub(super) async fn send_transaction_update_notification(
131 &self,
132 tx: &TransactionRepoModel,
133 ) -> Result<(), TransactionError> {
134 if let Some(notification_id) = &self.relayer().notification_id {
135 self.job_producer()
136 .produce_send_notification_job(
137 produce_transaction_update_notification_payload(notification_id, tx),
138 None,
139 )
140 .await
141 .map_err(|e| {
142 TransactionError::UnexpectedError(format!("Failed to send notification: {}", e))
143 })?;
144 }
145 Ok(())
146 }
147
148 pub async fn finalize_transaction_state(
150 &self,
151 tx_id: String,
152 update_req: TransactionUpdateRequest,
153 ) -> Result<TransactionRepoModel, TransactionError> {
154 let updated_tx = self
155 .transaction_repository()
156 .partial_update(tx_id, update_req)
157 .await?;
158
159 self.send_transaction_update_notification(&updated_tx)
160 .await?;
161 Ok(updated_tx)
162 }
163
164 pub async fn enqueue_next_pending_transaction(
165 &self,
166 finished_tx_id: &str,
167 ) -> Result<(), TransactionError> {
168 if let Some(next) = self
169 .find_oldest_pending_for_relayer(&self.relayer().id)
170 .await?
171 {
172 info!("Handing over lane from {} to {}", finished_tx_id, next.id);
174 lane_gate::pass_to(&self.relayer().id, finished_tx_id, &next.id);
175 self.send_transaction_request_job(&next, None).await?;
176 } else {
177 info!("Releasing relayer lane after {}", finished_tx_id);
178 lane_gate::free(&self.relayer().id, finished_tx_id);
179 }
180 Ok(())
181 }
182
183 async fn find_oldest_pending_for_relayer(
185 &self,
186 relayer_id: &str,
187 ) -> Result<Option<TransactionRepoModel>, TransactionError> {
188 let pending_txs = self
189 .transaction_repository()
190 .find_by_status(relayer_id, &[TransactionStatus::Pending])
191 .await
192 .map_err(TransactionError::from)?;
193
194 Ok(pending_txs.into_iter().next())
195 }
196
197 pub async fn sync_sequence_from_chain(
200 &self,
201 relayer_address: &str,
202 ) -> Result<(), TransactionError> {
203 info!(
204 "Syncing sequence number from chain for address: {}",
205 relayer_address
206 );
207
208 let next_usable_seq = fetch_next_sequence_from_chain(self.provider(), relayer_address)
210 .await
211 .map_err(TransactionError::UnexpectedError)?;
212
213 self.transaction_counter_service()
215 .set(&self.relayer().id, relayer_address, next_usable_seq)
216 .await
217 .map_err(|e| {
218 TransactionError::UnexpectedError(format!(
219 "Failed to update sequence counter: {}",
220 e
221 ))
222 })?;
223
224 info!("Updated local sequence counter to {}", next_usable_seq);
225 Ok(())
226 }
227
228 pub async fn reset_transaction_for_retry(
231 &self,
232 tx: TransactionRepoModel,
233 ) -> Result<TransactionRepoModel, TransactionError> {
234 info!("Resetting transaction {} for retry through pipeline", tx.id);
235
236 let update_req = tx.create_reset_update_request()?;
238
239 let reset_tx = self
241 .transaction_repository()
242 .partial_update(tx.id.clone(), update_req)
243 .await?;
244
245 info!(
246 "Transaction {} reset successfully to pre-prepare state",
247 reset_tx.id
248 );
249 Ok(reset_tx)
250 }
251}
252
253#[async_trait]
254impl<R, T, J, S, P, C> Transaction for StellarRelayerTransaction<R, T, J, S, P, C>
255where
256 R: Repository<RelayerRepoModel, String> + Send + Sync,
257 T: TransactionRepository + Send + Sync,
258 J: JobProducerTrait + Send + Sync,
259 S: Signer + Send + Sync,
260 P: StellarProviderTrait + Send + Sync,
261 C: TransactionCounterTrait + Send + Sync,
262{
263 async fn prepare_transaction(
264 &self,
265 tx: TransactionRepoModel,
266 ) -> Result<TransactionRepoModel, TransactionError> {
267 self.prepare_transaction_impl(tx).await
268 }
269
270 async fn submit_transaction(
271 &self,
272 tx: TransactionRepoModel,
273 ) -> Result<TransactionRepoModel, TransactionError> {
274 self.submit_transaction_impl(tx).await
275 }
276
277 async fn resubmit_transaction(
278 &self,
279 tx: TransactionRepoModel,
280 ) -> Result<TransactionRepoModel, TransactionError> {
281 Ok(tx)
282 }
283
284 async fn handle_transaction_status(
285 &self,
286 tx: TransactionRepoModel,
287 ) -> Result<TransactionRepoModel, TransactionError> {
288 self.handle_transaction_status_impl(tx).await
289 }
290
291 async fn cancel_transaction(
292 &self,
293 tx: TransactionRepoModel,
294 ) -> Result<TransactionRepoModel, TransactionError> {
295 Ok(tx)
296 }
297
298 async fn replace_transaction(
299 &self,
300 _old_tx: TransactionRepoModel,
301 _new_tx_request: NetworkTransactionRequest,
302 ) -> Result<TransactionRepoModel, TransactionError> {
303 Ok(_old_tx)
304 }
305
306 async fn sign_transaction(
307 &self,
308 tx: TransactionRepoModel,
309 ) -> Result<TransactionRepoModel, TransactionError> {
310 Ok(tx)
311 }
312
313 async fn validate_transaction(
314 &self,
315 _tx: TransactionRepoModel,
316 ) -> Result<bool, TransactionError> {
317 Ok(true)
318 }
319}
320
321pub type DefaultStellarTransaction = StellarRelayerTransaction<
322 RelayerRepositoryStorage,
323 TransactionRepositoryStorage,
324 JobProducer,
325 StellarSigner,
326 StellarProvider,
327 TransactionCounterRepositoryStorage,
328>;
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::models::{NetworkTransactionData, RepositoryError};
334 use std::sync::Arc;
335
336 use crate::domain::transaction::stellar::test_helpers::*;
337
338 #[test]
339 fn new_returns_ok() {
340 let relayer = create_test_relayer();
341 let mocks = default_test_mocks();
342 let result = StellarRelayerTransaction::new(
343 relayer,
344 Arc::new(mocks.relayer_repo),
345 Arc::new(mocks.tx_repo),
346 Arc::new(mocks.job_producer),
347 Arc::new(mocks.signer),
348 mocks.provider,
349 Arc::new(mocks.counter),
350 );
351 assert!(result.is_ok());
352 }
353
354 #[test]
355 fn accessor_methods_return_correct_references() {
356 let relayer = create_test_relayer();
357 let mocks = default_test_mocks();
358 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
359
360 assert_eq!(handler.relayer().id, "relayer-1");
362 assert_eq!(handler.relayer().address, TEST_PK);
363
364 let _ = handler.provider();
366 let _ = handler.job_producer();
367 let _ = handler.transaction_repository();
368 let _ = handler.signer();
369 let _ = handler.transaction_counter_service();
370 }
371
372 #[tokio::test]
373 async fn send_transaction_request_job_success() {
374 let relayer = create_test_relayer();
375 let mut mocks = default_test_mocks();
376
377 mocks
378 .job_producer
379 .expect_produce_transaction_request_job()
380 .withf(|job, delay| {
381 job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay.is_none()
382 })
383 .times(1)
384 .returning(|_, _| Box::pin(async { Ok(()) }));
385
386 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
387 let tx = create_test_transaction(&relayer.id);
388
389 let result = handler.send_transaction_request_job(&tx, None).await;
390 assert!(result.is_ok());
391 }
392
393 #[tokio::test]
394 async fn send_transaction_request_job_with_delay() {
395 let relayer = create_test_relayer();
396 let mut mocks = default_test_mocks();
397
398 mocks
399 .job_producer
400 .expect_produce_transaction_request_job()
401 .withf(|job, delay| {
402 job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay == &Some(60)
403 })
404 .times(1)
405 .returning(|_, _| Box::pin(async { Ok(()) }));
406
407 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
408 let tx = create_test_transaction(&relayer.id);
409
410 let result = handler.send_transaction_request_job(&tx, Some(60)).await;
411 assert!(result.is_ok());
412 }
413
414 #[tokio::test]
415 async fn finalize_transaction_state_success() {
416 let relayer = create_test_relayer();
417 let mut mocks = default_test_mocks();
418
419 mocks
421 .tx_repo
422 .expect_partial_update()
423 .withf(|tx_id, update| {
424 tx_id == "tx-1"
425 && update.status == Some(TransactionStatus::Confirmed)
426 && update.status_reason == Some("Transaction confirmed".to_string())
427 })
428 .times(1)
429 .returning(|tx_id, update| {
430 let mut tx = create_test_transaction("relayer-1");
431 tx.id = tx_id;
432 tx.status = update.status.unwrap();
433 tx.status_reason = update.status_reason;
434 tx.confirmed_at = update.confirmed_at;
435 Ok::<_, RepositoryError>(tx)
436 });
437
438 mocks
440 .job_producer
441 .expect_produce_send_notification_job()
442 .times(1)
443 .returning(|_, _| Box::pin(async { Ok(()) }));
444
445 let handler = make_stellar_tx_handler(relayer, mocks);
446
447 let update_request = TransactionUpdateRequest {
448 status: Some(TransactionStatus::Confirmed),
449 status_reason: Some("Transaction confirmed".to_string()),
450 confirmed_at: Some("2023-01-01T00:00:00Z".to_string()),
451 ..Default::default()
452 };
453
454 let result = handler
455 .finalize_transaction_state("tx-1".to_string(), update_request)
456 .await;
457
458 assert!(result.is_ok());
459 let updated_tx = result.unwrap();
460 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
461 assert_eq!(
462 updated_tx.status_reason,
463 Some("Transaction confirmed".to_string())
464 );
465 }
466
467 #[tokio::test]
468 async fn enqueue_next_pending_transaction_with_pending_tx() {
469 let relayer = create_test_relayer();
470 let mut mocks = default_test_mocks();
471
472 let mut pending_tx = create_test_transaction(&relayer.id);
474 pending_tx.id = "pending-tx-1".to_string();
475
476 mocks
477 .tx_repo
478 .expect_find_by_status()
479 .withf(|relayer_id, statuses| {
480 relayer_id == "relayer-1" && statuses == [TransactionStatus::Pending]
481 })
482 .times(1)
483 .returning(move |_, _| {
484 let mut tx = create_test_transaction("relayer-1");
485 tx.id = "pending-tx-1".to_string();
486 Ok(vec![tx])
487 });
488
489 mocks
491 .job_producer
492 .expect_produce_transaction_request_job()
493 .withf(|job, delay| job.transaction_id == "pending-tx-1" && delay.is_none())
494 .times(1)
495 .returning(|_, _| Box::pin(async { Ok(()) }));
496
497 let handler = make_stellar_tx_handler(relayer, mocks);
498
499 let result = handler
500 .enqueue_next_pending_transaction("finished-tx")
501 .await;
502 assert!(result.is_ok());
503 }
504
505 #[tokio::test]
506 async fn enqueue_next_pending_transaction_no_pending_tx() {
507 let relayer = create_test_relayer();
508 let mut mocks = default_test_mocks();
509
510 mocks
512 .tx_repo
513 .expect_find_by_status()
514 .times(1)
515 .returning(|_, _| Ok(vec![]));
516
517 let handler = make_stellar_tx_handler(relayer, mocks);
518
519 let result = handler
520 .enqueue_next_pending_transaction("finished-tx")
521 .await;
522 assert!(result.is_ok());
523 }
524
525 #[tokio::test]
526 async fn test_sync_sequence_from_chain() {
527 let relayer = create_test_relayer();
528 let mut mocks = default_test_mocks();
529
530 mocks
532 .provider
533 .expect_get_account()
534 .withf(|addr| addr == TEST_PK)
535 .times(1)
536 .returning(|_| {
537 Box::pin(async {
538 use soroban_rs::xdr::{
539 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
540 String32, Thresholds, Uint256,
541 };
542 use stellar_strkey::ed25519;
543
544 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
546 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
547
548 Ok(AccountEntry {
549 account_id,
550 balance: 1000000,
551 seq_num: SequenceNumber(100),
552 num_sub_entries: 0,
553 inflation_dest: None,
554 flags: 0,
555 home_domain: String32::default(),
556 thresholds: Thresholds([1, 1, 1, 1]),
557 signers: Default::default(),
558 ext: AccountEntryExt::V0,
559 })
560 })
561 });
562
563 mocks
565 .counter
566 .expect_set()
567 .withf(|relayer_id, addr, seq| {
568 relayer_id == "relayer-1" && addr == TEST_PK && *seq == 101
569 })
570 .times(1)
571 .returning(|_, _, _| Box::pin(async { Ok(()) }));
572
573 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
574
575 let result = handler.sync_sequence_from_chain(&relayer.address).await;
576 assert!(result.is_ok());
577 }
578
579 #[tokio::test]
580 async fn test_sync_sequence_from_chain_provider_error() {
581 let relayer = create_test_relayer();
582 let mut mocks = default_test_mocks();
583
584 mocks
586 .provider
587 .expect_get_account()
588 .times(1)
589 .returning(|_| Box::pin(async { Err(eyre::eyre!("Account not found")) }));
590
591 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
592
593 let result = handler.sync_sequence_from_chain(&relayer.address).await;
594 assert!(result.is_err());
595 match result.unwrap_err() {
596 TransactionError::UnexpectedError(msg) => {
597 assert!(msg.contains("Failed to fetch account from chain"));
598 }
599 _ => panic!("Expected UnexpectedError"),
600 }
601 }
602
603 #[tokio::test]
604 async fn test_sync_sequence_from_chain_counter_error() {
605 let relayer = create_test_relayer();
606 let mut mocks = default_test_mocks();
607
608 mocks.provider.expect_get_account().times(1).returning(|_| {
610 Box::pin(async {
611 use soroban_rs::xdr::{
612 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber, String32,
613 Thresholds, Uint256,
614 };
615 use stellar_strkey::ed25519;
616
617 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
619 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
620
621 Ok(AccountEntry {
622 account_id,
623 balance: 1000000,
624 seq_num: SequenceNumber(100),
625 num_sub_entries: 0,
626 inflation_dest: None,
627 flags: 0,
628 home_domain: String32::default(),
629 thresholds: Thresholds([1, 1, 1, 1]),
630 signers: Default::default(),
631 ext: AccountEntryExt::V0,
632 })
633 })
634 });
635
636 mocks.counter.expect_set().times(1).returning(|_, _, _| {
638 Box::pin(async {
639 Err(RepositoryError::Unknown(
640 "Counter update failed".to_string(),
641 ))
642 })
643 });
644
645 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
646
647 let result = handler.sync_sequence_from_chain(&relayer.address).await;
648 assert!(result.is_err());
649 match result.unwrap_err() {
650 TransactionError::UnexpectedError(msg) => {
651 assert!(msg.contains("Failed to update sequence counter"));
652 }
653 _ => panic!("Expected UnexpectedError"),
654 }
655 }
656
657 #[tokio::test]
658 async fn test_reset_transaction_for_retry() {
659 let relayer = create_test_relayer();
660 let mut mocks = default_test_mocks();
661
662 let mut tx = create_test_transaction(&relayer.id);
664 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
665 data.sequence_number = Some(42);
666 data.signatures.push(dummy_signature());
667 data.hash = Some("test-hash".to_string());
668 data.signed_envelope_xdr = Some("test-xdr".to_string());
669 }
670
671 mocks
673 .tx_repo
674 .expect_partial_update()
675 .withf(|tx_id, upd| {
676 tx_id == "tx-1"
677 && upd.status == Some(TransactionStatus::Pending)
678 && upd.sent_at.is_none()
679 && upd.confirmed_at.is_none()
680 })
681 .times(1)
682 .returning(|id, upd| {
683 let mut tx = create_test_transaction("relayer-1");
684 tx.id = id;
685 tx.status = upd.status.unwrap();
686 if let Some(network_data) = upd.network_data {
687 tx.network_data = network_data;
688 }
689 Ok::<_, RepositoryError>(tx)
690 });
691
692 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
693
694 let result = handler.reset_transaction_for_retry(tx).await;
695 assert!(result.is_ok());
696
697 let reset_tx = result.unwrap();
698 assert_eq!(reset_tx.status, TransactionStatus::Pending);
699
700 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
702 assert!(data.sequence_number.is_none());
703 assert!(data.signatures.is_empty());
704 assert!(data.hash.is_none());
705 assert!(data.signed_envelope_xdr.is_none());
706 } else {
707 panic!("Expected Stellar transaction data");
708 }
709 }
710}