1use chrono::Utc;
7use log::{debug, error, info, warn};
8use solana_sdk::signature::Signature;
9use std::str::FromStr;
10
11use super::SolanaRelayerTransaction;
12use crate::{
13 jobs::{JobProducerTrait, TransactionStatusCheck},
14 models::{
15 produce_transaction_update_notification_payload, RelayerRepoModel, SolanaTransactionStatus,
16 TransactionError, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
17 },
18 repositories::{transaction::TransactionRepository, RelayerRepository, Repository},
19 services::provider::SolanaProviderTrait,
20};
21
22const SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS: i64 = 10;
24
25impl<P, RR, TR, J> SolanaRelayerTransaction<P, RR, TR, J>
26where
27 P: SolanaProviderTrait,
28 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
29 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
30 J: JobProducerTrait + Send + Sync + 'static,
31{
32 pub async fn handle_transaction_status_impl(
34 &self,
35 tx: TransactionRepoModel,
36 ) -> Result<TransactionRepoModel, TransactionError> {
37 info!("Handling Solana transaction status for: {:?}", tx.id);
38
39 if matches!(
41 tx.status,
42 TransactionStatus::Confirmed | TransactionStatus::Failed | TransactionStatus::Expired
43 ) {
44 info!(
45 "Transaction {} already in final state: {:?}",
46 tx.id, tx.status
47 );
48 return Ok(tx);
49 }
50
51 match self.check_and_update_status(tx.clone()).await {
53 Ok(updated_tx) => Ok(updated_tx),
54 Err(error) => {
55 match error {
57 TransactionError::ValidationError(_) => {
58 Err(error)
60 }
61 _ => {
62 self.handle_status_check_failure(tx, error).await
64 }
65 }
66 }
67 }
68 }
69
70 async fn handle_status_check_failure(
73 &self,
74 tx: TransactionRepoModel,
75 error: TransactionError,
76 ) -> Result<TransactionRepoModel, TransactionError> {
77 warn!(
78 "Failed to get Solana transaction status for {}: {}. Re-queueing check.",
79 tx.id, error
80 );
81
82 if let Err(requeue_error) = self
83 .schedule_status_check(&tx, Some(2 * SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS))
84 .await
85 {
86 warn!(
87 "Failed to requeue status check for transaction {}: {}",
88 tx.id, requeue_error
89 );
90 }
91
92 info!(
93 "Transaction {} status check failure handled. Will retry later. Error: {}",
94 tx.id, error
95 );
96
97 Err(error)
99 }
100
101 async fn check_and_update_status(
103 &self,
104 tx: TransactionRepoModel,
105 ) -> Result<TransactionRepoModel, TransactionError> {
106 let solana_data = tx.network_data.get_solana_transaction_data()?;
108 let signature_str = solana_data.signature.as_ref().ok_or_else(|| {
109 TransactionError::ValidationError("Transaction signature is missing".to_string())
110 })?;
111
112 let signature = Signature::from_str(signature_str).map_err(|e| {
113 TransactionError::ValidationError(format!("Invalid signature format: {}", e))
114 })?;
115
116 let solana_status = self
118 .provider()
119 .get_transaction_status(&signature)
120 .await
121 .map_err(|e| {
122 TransactionError::UnexpectedError(format!(
123 "Failed to get Solana transaction status for tx {} (signature {}): {}",
124 tx.id, signature_str, e
125 ))
126 })?;
127
128 println!("solana_status: {:?}", solana_status);
129
130 match solana_status {
132 SolanaTransactionStatus::Processed => self.handle_processed_status(tx).await,
133 SolanaTransactionStatus::Confirmed => self.handle_confirmed_status(tx).await,
134 SolanaTransactionStatus::Finalized => self.handle_finalized_status(tx).await,
135 SolanaTransactionStatus::Failed => self.handle_failed_status(tx).await,
136 }
137 }
138
139 async fn update_transaction_status_if_needed(
141 &self,
142 tx: TransactionRepoModel,
143 new_status: TransactionStatus,
144 ) -> Result<TransactionRepoModel, TransactionError> {
145 if tx.status != new_status {
146 let update_request = TransactionUpdateRequest {
147 status: Some(new_status.clone()),
148 confirmed_at: if matches!(new_status, TransactionStatus::Confirmed) {
149 Some(Utc::now().to_rfc3339())
150 } else {
151 None
152 },
153 ..Default::default()
154 };
155 return self
156 .finalize_transaction_state(tx.id.clone(), update_request)
157 .await;
158 }
159 Ok(tx)
160 }
161
162 async fn schedule_status_check(
164 &self,
165 tx: &TransactionRepoModel,
166 delay_seconds: Option<i64>,
167 ) -> Result<(), TransactionError> {
168 let delay = delay_seconds.map(|seconds| Utc::now().timestamp() + seconds);
169 self.job_producer()
170 .produce_check_transaction_status_job(
171 TransactionStatusCheck::new(tx.id.clone(), tx.relayer_id.clone()),
172 delay,
173 )
174 .await
175 .map_err(|e| {
176 TransactionError::UnexpectedError(format!("Failed to schedule status check: {}", e))
177 })
178 }
179
180 async fn handle_processed_status(
182 &self,
183 tx: TransactionRepoModel,
184 ) -> Result<TransactionRepoModel, TransactionError> {
185 info!(
186 "Transaction {} is processed but waiting for supermajority confirmation",
187 tx.id
188 );
189
190 self.schedule_status_check(&tx, Some(SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS))
192 .await?;
193
194 Ok(tx)
196 }
197
198 async fn handle_confirmed_status(
202 &self,
203 tx: TransactionRepoModel,
204 ) -> Result<TransactionRepoModel, TransactionError> {
205 debug!("Transaction {} is confirmed by supermajority", tx.id);
206
207 let updated_tx = self
209 .update_transaction_status_if_needed(tx, TransactionStatus::Mined)
210 .await?;
211
212 self.schedule_status_check(&updated_tx, Some(SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS))
214 .await?;
215
216 Ok(updated_tx)
217 }
218
219 async fn handle_finalized_status(
223 &self,
224 tx: TransactionRepoModel,
225 ) -> Result<TransactionRepoModel, TransactionError> {
226 info!("Transaction {} is finalized and irreversible", tx.id);
227
228 self.update_transaction_status_if_needed(tx, TransactionStatus::Confirmed)
230 .await
231 }
232
233 async fn handle_failed_status(
235 &self,
236 tx: TransactionRepoModel,
237 ) -> Result<TransactionRepoModel, TransactionError> {
238 warn!("Transaction {} failed on-chain", tx.id);
239
240 self.update_transaction_status_if_needed(tx, TransactionStatus::Failed)
242 .await
243 }
244
245 async fn finalize_transaction_state(
247 &self,
248 tx_id: String,
249 update_req: TransactionUpdateRequest,
250 ) -> Result<TransactionRepoModel, TransactionError> {
251 let updated_tx = self
253 .transaction_repository()
254 .partial_update(tx_id, update_req)
255 .await
256 .map_err(|e| TransactionError::UnexpectedError(e.to_string()))?;
257
258 self.send_transaction_update_notification(&updated_tx)
260 .await?;
261
262 Ok(updated_tx)
263 }
264
265 async fn send_transaction_update_notification(
267 &self,
268 tx: &TransactionRepoModel,
269 ) -> Result<(), TransactionError> {
270 if let Some(notification_id) = &self.relayer().notification_id {
271 info!("Sending webhook notification for transaction: {}", tx.id);
272
273 let notification_payload =
274 produce_transaction_update_notification_payload(notification_id, tx);
275
276 if let Err(e) = self
277 .job_producer()
278 .produce_send_notification_job(notification_payload, None)
279 .await
280 {
281 error!("Failed to produce notification job: {}", e);
282 }
283 }
284
285 Ok(())
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use crate::{
293 jobs::MockJobProducerTrait,
294 models::{NetworkTransactionData, SolanaTransactionData},
295 repositories::{MockRelayerRepository, MockTransactionRepository},
296 services::{MockSolanaProviderTrait, SolanaProviderError},
297 utils::mocks::mockutils::{create_mock_solana_relayer, create_mock_solana_transaction},
298 };
299 use eyre::Result;
300 use mockall::predicate::*;
301 use std::sync::Arc;
302
303 fn create_tx_with_signature(
305 status: TransactionStatus,
306 signature: Option<&str>,
307 ) -> TransactionRepoModel {
308 let mut tx = create_mock_solana_transaction();
309 tx.status = status;
310 if let Some(sig) = signature {
311 tx.network_data = NetworkTransactionData::Solana(SolanaTransactionData {
312 transaction: "test".to_string(),
313 signature: Some(sig.to_string()),
314 });
315 }
316 tx
317 }
318
319 #[tokio::test]
320 async fn test_handle_status_already_final() {
321 let provider = Arc::new(MockSolanaProviderTrait::new());
322 let relayer_repo = Arc::new(MockRelayerRepository::new());
323 let tx_repo = Arc::new(MockTransactionRepository::new());
324 let job_producer = Arc::new(MockJobProducerTrait::new());
325 let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
326
327 let handler =
328 SolanaRelayerTransaction::new(relayer, relayer_repo, provider, tx_repo, job_producer)
329 .unwrap();
330
331 let tx_confirmed = create_tx_with_signature(TransactionStatus::Confirmed, None);
333 let result = handler
334 .handle_transaction_status_impl(tx_confirmed.clone())
335 .await;
336 assert!(result.is_ok());
337 assert_eq!(result.unwrap().id, tx_confirmed.id);
338
339 let tx_failed = create_tx_with_signature(TransactionStatus::Failed, None);
341 let result = handler
342 .handle_transaction_status_impl(tx_failed.clone())
343 .await;
344 assert!(result.is_ok());
345 assert_eq!(result.unwrap().id, tx_failed.id);
346
347 let tx_expired = create_tx_with_signature(TransactionStatus::Expired, None);
349 let result = handler
350 .handle_transaction_status_impl(tx_expired.clone())
351 .await;
352 assert!(result.is_ok());
353 assert_eq!(result.unwrap().id, tx_expired.id);
354 }
355
356 #[tokio::test]
357 async fn test_handle_status_processed() -> Result<()> {
358 let mut provider = MockSolanaProviderTrait::new();
359 let relayer_repo = Arc::new(MockRelayerRepository::new());
360 let tx_repo = Arc::new(MockTransactionRepository::new());
361 let mut job_producer = MockJobProducerTrait::new();
362
363 let signature_str =
364 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
365 let tx = create_tx_with_signature(TransactionStatus::Pending, Some(signature_str));
366
367 provider
368 .expect_get_transaction_status()
369 .with(eq(Signature::from_str(signature_str)?))
370 .times(1)
371 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Processed) }));
372
373 job_producer
374 .expect_produce_check_transaction_status_job()
375 .withf(|check, delay| check.transaction_id == "test" && delay.is_some())
376 .times(1)
377 .returning(|_, _| Box::pin(async { Ok(()) }));
378
379 let handler = SolanaRelayerTransaction::new(
380 create_mock_solana_relayer("test-relayer".to_string(), false),
381 relayer_repo,
382 Arc::new(provider),
383 tx_repo,
384 Arc::new(job_producer),
385 )?;
386
387 let result = handler.handle_transaction_status_impl(tx.clone()).await;
388
389 assert!(result.is_ok());
390 let updated_tx = result.unwrap();
391 assert_eq!(updated_tx.id, tx.id);
392 assert_eq!(updated_tx.status, TransactionStatus::Pending); Ok(())
394 }
395
396 #[tokio::test]
397 async fn test_handle_status_confirmed() -> Result<()> {
398 let mut provider = MockSolanaProviderTrait::new();
399 let relayer_repo = Arc::new(MockRelayerRepository::new());
400 let mut tx_repo = MockTransactionRepository::new();
401 let mut job_producer = MockJobProducerTrait::new();
402
403 let signature_str =
404 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
405 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
406
407 provider
408 .expect_get_transaction_status()
409 .with(eq(Signature::from_str(signature_str)?))
410 .times(1)
411 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Confirmed) }));
412
413 job_producer
414 .expect_produce_check_transaction_status_job()
415 .withf(|check, delay| check.transaction_id == "test" && delay.is_some())
416 .times(1)
417 .returning(|_, _| Box::pin(async { Ok(()) }));
418
419 let tx_id = tx.id.clone();
420
421 tx_repo
422 .expect_partial_update()
423 .withf(move |tx_id_param, update_req| {
424 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Mined)
425 })
426 .times(1)
427 .returning(move |_, _| {
428 Ok(create_tx_with_signature(
429 TransactionStatus::Mined,
430 Some(signature_str),
431 ))
432 });
433
434 let handler = SolanaRelayerTransaction::new(
435 create_mock_solana_relayer("test-relayer".to_string(), false),
436 relayer_repo,
437 Arc::new(provider),
438 Arc::new(tx_repo),
439 Arc::new(job_producer),
440 )?;
441
442 let result = handler.handle_transaction_status_impl(tx.clone()).await;
443
444 assert!(result.is_ok());
445 let updated_tx = result.unwrap();
446 assert_eq!(updated_tx.id, tx.id);
447 assert_eq!(updated_tx.status, TransactionStatus::Mined);
448 Ok(())
449 }
450
451 #[tokio::test]
452 async fn test_handle_status_finalized() -> Result<()> {
453 let mut provider = MockSolanaProviderTrait::new();
454 let relayer_repo = Arc::new(MockRelayerRepository::new());
455 let mut tx_repo = MockTransactionRepository::new();
456 let job_producer = MockJobProducerTrait::new();
457
458 let signature_str =
459 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
460 let tx = create_tx_with_signature(TransactionStatus::Mined, Some(signature_str));
461
462 provider
463 .expect_get_transaction_status()
464 .with(eq(Signature::from_str(signature_str)?))
465 .times(1)
466 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Finalized) }));
467
468 let tx_id = tx.id.clone();
469
470 tx_repo
471 .expect_partial_update()
472 .withf(move |tx_id_param, update_req| {
473 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Confirmed)
474 })
475 .times(1)
476 .returning(move |_, _| {
477 Ok(create_tx_with_signature(
478 TransactionStatus::Confirmed,
479 Some(signature_str),
480 ))
481 });
482
483 let handler = SolanaRelayerTransaction::new(
484 create_mock_solana_relayer("test-relayer".to_string(), false),
485 relayer_repo,
486 Arc::new(provider),
487 Arc::new(tx_repo),
488 Arc::new(job_producer),
489 )?;
490
491 let result = handler.handle_transaction_status_impl(tx.clone()).await;
492
493 assert!(result.is_ok());
494 let updated_tx = result.unwrap();
495 assert_eq!(updated_tx.id, tx.id);
496 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
497 Ok(())
498 }
499
500 #[tokio::test]
501 async fn test_handle_status_provider_error() -> Result<()> {
502 let mut provider = MockSolanaProviderTrait::new();
503 let relayer_repo = Arc::new(MockRelayerRepository::new());
504 let tx_repo = Arc::new(MockTransactionRepository::new());
505 let mut job_producer = MockJobProducerTrait::new();
506
507 let signature_str = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
508 let tx = create_tx_with_signature(TransactionStatus::Pending, Some(signature_str));
509 let error_message = "Provider is down";
510
511 provider
512 .expect_get_transaction_status()
513 .with(eq(Signature::from_str(signature_str)?))
514 .times(1)
515 .returning(move |_| {
516 Box::pin(async { Err(SolanaProviderError::RpcError(error_message.to_string())) })
517 });
518
519 job_producer
520 .expect_produce_check_transaction_status_job()
521 .withf(|check, delay| check.transaction_id == "test" && delay.is_some())
522 .times(1)
523 .returning(|_, _| Box::pin(async { Ok(()) }));
524
525 let handler = SolanaRelayerTransaction::new(
526 create_mock_solana_relayer("test-relayer".to_string(), false),
527 relayer_repo,
528 Arc::new(provider),
529 tx_repo,
530 Arc::new(job_producer),
531 )?;
532
533 let result = handler.handle_transaction_status_impl(tx.clone()).await;
534
535 assert!(result.is_err());
536 let err = result.unwrap_err();
537 assert!(matches!(err, TransactionError::UnexpectedError(_)));
538 Ok(())
539 }
540
541 #[tokio::test]
542 async fn test_handle_status_failed() -> Result<()> {
543 let mut provider = MockSolanaProviderTrait::new();
544 let relayer_repo = Arc::new(MockRelayerRepository::new());
545 let mut tx_repo = MockTransactionRepository::new();
546 let job_producer = MockJobProducerTrait::new();
547
548 let signature_str =
549 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
550 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
551
552 provider
553 .expect_get_transaction_status()
554 .with(eq(Signature::from_str(signature_str)?))
555 .times(1)
556 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Failed) }));
557
558 let tx_id = tx.id.clone();
559
560 tx_repo
561 .expect_partial_update()
562 .withf(move |tx_id_param, update_req| {
563 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Failed)
564 })
565 .times(1)
566 .returning(move |_, _| {
567 Ok(create_tx_with_signature(
568 TransactionStatus::Failed,
569 Some(signature_str),
570 ))
571 });
572
573 let handler = SolanaRelayerTransaction::new(
574 create_mock_solana_relayer("test-relayer".to_string(), false),
575 relayer_repo,
576 Arc::new(provider),
577 Arc::new(tx_repo),
578 Arc::new(job_producer),
579 )?;
580
581 let result = handler.handle_transaction_status_impl(tx.clone()).await;
582
583 assert!(result.is_ok());
584 let updated_tx = result.unwrap();
585 assert_eq!(updated_tx.id, tx.id);
586 assert_eq!(updated_tx.status, TransactionStatus::Failed);
587 Ok(())
588 }
589}