openzeppelin_relayer/domain/transaction/solana/
status.rs

1//! Solana transaction status handling implementation
2//!
3//! This module provides transaction status checking for Solana transactions,
4//! including status updates, repository management, and webhook notifications.
5
6use chrono::Utc;
7use log::{debug, error, info, warn};
8use solana_sdk::signature::Signature;
9use std::str::FromStr;
10
11use super::SolanaRelayerTransaction;
12use crate::{
13    jobs::{JobProducerTrait, TransactionStatusCheck},
14    models::{
15        produce_transaction_update_notification_payload, RelayerRepoModel, SolanaTransactionStatus,
16        TransactionError, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
17    },
18    repositories::{transaction::TransactionRepository, RelayerRepository, Repository},
19    services::provider::SolanaProviderTrait,
20};
21
22/// Default delay for retrying status checks after failures (in seconds)
23const SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS: i64 = 10;
24
25impl<P, RR, TR, J> SolanaRelayerTransaction<P, RR, TR, J>
26where
27    P: SolanaProviderTrait,
28    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
29    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
30    J: JobProducerTrait + Send + Sync + 'static,
31{
32    /// Main status handling method with error handling and retries
33    pub async fn handle_transaction_status_impl(
34        &self,
35        tx: TransactionRepoModel,
36    ) -> Result<TransactionRepoModel, TransactionError> {
37        info!("Handling Solana transaction status for: {:?}", tx.id);
38
39        // Skip if already in final state
40        if matches!(
41            tx.status,
42            TransactionStatus::Confirmed | TransactionStatus::Failed | TransactionStatus::Expired
43        ) {
44            info!(
45                "Transaction {} already in final state: {:?}",
46                tx.id, tx.status
47            );
48            return Ok(tx);
49        }
50
51        // Call core status checking logic with error handling
52        match self.check_and_update_status(tx.clone()).await {
53            Ok(updated_tx) => Ok(updated_tx),
54            Err(error) => {
55                // Only retry for provider errors, not validation errors
56                match error {
57                    TransactionError::ValidationError(_) => {
58                        // Don't retry validation errors (like missing signature)
59                        Err(error)
60                    }
61                    _ => {
62                        // Handle status check failure - requeue for retry
63                        self.handle_status_check_failure(tx, error).await
64                    }
65                }
66            }
67        }
68    }
69
70    /// Handles status check failures with retry logic.
71    /// This method ensures failed status checks are retried appropriately.
72    async fn handle_status_check_failure(
73        &self,
74        tx: TransactionRepoModel,
75        error: TransactionError,
76    ) -> Result<TransactionRepoModel, TransactionError> {
77        warn!(
78            "Failed to get Solana transaction status for {}: {}. Re-queueing check.",
79            tx.id, error
80        );
81
82        if let Err(requeue_error) = self
83            .schedule_status_check(&tx, Some(2 * SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS))
84            .await
85        {
86            warn!(
87                "Failed to requeue status check for transaction {}: {}",
88                tx.id, requeue_error
89            );
90        }
91
92        info!(
93            "Transaction {} status check failure handled. Will retry later. Error: {}",
94            tx.id, error
95        );
96
97        // Return the original error even though we scheduled a retry
98        Err(error)
99    }
100
101    /// Core status checking logic
102    async fn check_and_update_status(
103        &self,
104        tx: TransactionRepoModel,
105    ) -> Result<TransactionRepoModel, TransactionError> {
106        // Extract signature from Solana transaction data
107        let solana_data = tx.network_data.get_solana_transaction_data()?;
108        let signature_str = solana_data.signature.as_ref().ok_or_else(|| {
109            TransactionError::ValidationError("Transaction signature is missing".to_string())
110        })?;
111
112        let signature = Signature::from_str(signature_str).map_err(|e| {
113            TransactionError::ValidationError(format!("Invalid signature format: {}", e))
114        })?;
115
116        // Get transaction status from provider
117        let solana_status = self
118            .provider()
119            .get_transaction_status(&signature)
120            .await
121            .map_err(|e| {
122                TransactionError::UnexpectedError(format!(
123                    "Failed to get Solana transaction status for tx {} (signature {}): {}",
124                    tx.id, signature_str, e
125                ))
126            })?;
127
128        println!("solana_status: {:?}", solana_status);
129
130        // Map Solana status to repository status and handle accordingly
131        match solana_status {
132            SolanaTransactionStatus::Processed => self.handle_processed_status(tx).await,
133            SolanaTransactionStatus::Confirmed => self.handle_confirmed_status(tx).await,
134            SolanaTransactionStatus::Finalized => self.handle_finalized_status(tx).await,
135            SolanaTransactionStatus::Failed => self.handle_failed_status(tx).await,
136        }
137    }
138
139    /// Helper method that updates transaction status only if it's different from the current status
140    async fn update_transaction_status_if_needed(
141        &self,
142        tx: TransactionRepoModel,
143        new_status: TransactionStatus,
144    ) -> Result<TransactionRepoModel, TransactionError> {
145        if tx.status != new_status {
146            let update_request = TransactionUpdateRequest {
147                status: Some(new_status.clone()),
148                confirmed_at: if matches!(new_status, TransactionStatus::Confirmed) {
149                    Some(Utc::now().to_rfc3339())
150                } else {
151                    None
152                },
153                ..Default::default()
154            };
155            return self
156                .finalize_transaction_state(tx.id.clone(), update_request)
157                .await;
158        }
159        Ok(tx)
160    }
161
162    /// Helper method to schedule a transaction status check job
163    async fn schedule_status_check(
164        &self,
165        tx: &TransactionRepoModel,
166        delay_seconds: Option<i64>,
167    ) -> Result<(), TransactionError> {
168        let delay = delay_seconds.map(|seconds| Utc::now().timestamp() + seconds);
169        self.job_producer()
170            .produce_check_transaction_status_job(
171                TransactionStatusCheck::new(tx.id.clone(), tx.relayer_id.clone()),
172                delay,
173            )
174            .await
175            .map_err(|e| {
176                TransactionError::UnexpectedError(format!("Failed to schedule status check: {}", e))
177            })
178    }
179
180    /// Handle processed status (transaction processed by leader but not yet confirmed)
181    async fn handle_processed_status(
182        &self,
183        tx: TransactionRepoModel,
184    ) -> Result<TransactionRepoModel, TransactionError> {
185        info!(
186            "Transaction {} is processed but waiting for supermajority confirmation",
187            tx.id
188        );
189
190        // Schedule another status check since transaction is not in final state
191        self.schedule_status_check(&tx, Some(SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS))
192            .await?;
193
194        // Keep current status - will check again later for confirmation/finalization
195        Ok(tx)
196    }
197
198    /// Handle confirmed status (transaction confirmed by supermajority)
199    /// We are mapping this to mined status because we don't have a separate finalized status
200    /// and we want to keep the status consistent with the other networks
201    async fn handle_confirmed_status(
202        &self,
203        tx: TransactionRepoModel,
204    ) -> Result<TransactionRepoModel, TransactionError> {
205        debug!("Transaction {} is confirmed by supermajority", tx.id);
206
207        // Update status to mined only if not already mined
208        let updated_tx = self
209            .update_transaction_status_if_needed(tx, TransactionStatus::Mined)
210            .await?;
211
212        // Schedule another status check since transaction could progress to finalized
213        self.schedule_status_check(&updated_tx, Some(SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS))
214            .await?;
215
216        Ok(updated_tx)
217    }
218
219    /// Handle finalized status (transaction is finalized and irreversible)
220    /// We are mapping this to confirmed status because we don't have a separate finalized status
221    /// and we want to keep the status consistent with the other networks
222    async fn handle_finalized_status(
223        &self,
224        tx: TransactionRepoModel,
225    ) -> Result<TransactionRepoModel, TransactionError> {
226        info!("Transaction {} is finalized and irreversible", tx.id);
227
228        // Update status to confirmed only if not already confirmed (final success state)
229        self.update_transaction_status_if_needed(tx, TransactionStatus::Confirmed)
230            .await
231    }
232
233    /// Handle failed status (transaction failed on-chain)
234    async fn handle_failed_status(
235        &self,
236        tx: TransactionRepoModel,
237    ) -> Result<TransactionRepoModel, TransactionError> {
238        warn!("Transaction {} failed on-chain", tx.id);
239
240        // Update status to failed only if not already failed (final failure state)
241        self.update_transaction_status_if_needed(tx, TransactionStatus::Failed)
242            .await
243    }
244
245    /// Helper function to update transaction status, save it, and send notification
246    async fn finalize_transaction_state(
247        &self,
248        tx_id: String,
249        update_req: TransactionUpdateRequest,
250    ) -> Result<TransactionRepoModel, TransactionError> {
251        // Update transaction in repository
252        let updated_tx = self
253            .transaction_repository()
254            .partial_update(tx_id, update_req)
255            .await
256            .map_err(|e| TransactionError::UnexpectedError(e.to_string()))?;
257
258        // Send webhook notification if relayer has notification configured
259        self.send_transaction_update_notification(&updated_tx)
260            .await?;
261
262        Ok(updated_tx)
263    }
264
265    /// Send webhook notification for transaction updates
266    async fn send_transaction_update_notification(
267        &self,
268        tx: &TransactionRepoModel,
269    ) -> Result<(), TransactionError> {
270        if let Some(notification_id) = &self.relayer().notification_id {
271            info!("Sending webhook notification for transaction: {}", tx.id);
272
273            let notification_payload =
274                produce_transaction_update_notification_payload(notification_id, tx);
275
276            if let Err(e) = self
277                .job_producer()
278                .produce_send_notification_job(notification_payload, None)
279                .await
280            {
281                error!("Failed to produce notification job: {}", e);
282            }
283        }
284
285        Ok(())
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use crate::{
293        jobs::MockJobProducerTrait,
294        models::{NetworkTransactionData, SolanaTransactionData},
295        repositories::{MockRelayerRepository, MockTransactionRepository},
296        services::{MockSolanaProviderTrait, SolanaProviderError},
297        utils::mocks::mockutils::{create_mock_solana_relayer, create_mock_solana_transaction},
298    };
299    use eyre::Result;
300    use mockall::predicate::*;
301    use std::sync::Arc;
302
303    // Helper to create a transaction with a specific status and optional signature
304    fn create_tx_with_signature(
305        status: TransactionStatus,
306        signature: Option<&str>,
307    ) -> TransactionRepoModel {
308        let mut tx = create_mock_solana_transaction();
309        tx.status = status;
310        if let Some(sig) = signature {
311            tx.network_data = NetworkTransactionData::Solana(SolanaTransactionData {
312                transaction: "test".to_string(),
313                signature: Some(sig.to_string()),
314            });
315        }
316        tx
317    }
318
319    #[tokio::test]
320    async fn test_handle_status_already_final() {
321        let provider = Arc::new(MockSolanaProviderTrait::new());
322        let relayer_repo = Arc::new(MockRelayerRepository::new());
323        let tx_repo = Arc::new(MockTransactionRepository::new());
324        let job_producer = Arc::new(MockJobProducerTrait::new());
325        let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
326
327        let handler =
328            SolanaRelayerTransaction::new(relayer, relayer_repo, provider, tx_repo, job_producer)
329                .unwrap();
330
331        // Test with Confirmed status
332        let tx_confirmed = create_tx_with_signature(TransactionStatus::Confirmed, None);
333        let result = handler
334            .handle_transaction_status_impl(tx_confirmed.clone())
335            .await;
336        assert!(result.is_ok());
337        assert_eq!(result.unwrap().id, tx_confirmed.id);
338
339        // Test with Failed status
340        let tx_failed = create_tx_with_signature(TransactionStatus::Failed, None);
341        let result = handler
342            .handle_transaction_status_impl(tx_failed.clone())
343            .await;
344        assert!(result.is_ok());
345        assert_eq!(result.unwrap().id, tx_failed.id);
346
347        // Test with Expired status
348        let tx_expired = create_tx_with_signature(TransactionStatus::Expired, None);
349        let result = handler
350            .handle_transaction_status_impl(tx_expired.clone())
351            .await;
352        assert!(result.is_ok());
353        assert_eq!(result.unwrap().id, tx_expired.id);
354    }
355
356    #[tokio::test]
357    async fn test_handle_status_processed() -> Result<()> {
358        let mut provider = MockSolanaProviderTrait::new();
359        let relayer_repo = Arc::new(MockRelayerRepository::new());
360        let tx_repo = Arc::new(MockTransactionRepository::new());
361        let mut job_producer = MockJobProducerTrait::new();
362
363        let signature_str =
364            "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
365        let tx = create_tx_with_signature(TransactionStatus::Pending, Some(signature_str));
366
367        provider
368            .expect_get_transaction_status()
369            .with(eq(Signature::from_str(signature_str)?))
370            .times(1)
371            .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Processed) }));
372
373        job_producer
374            .expect_produce_check_transaction_status_job()
375            .withf(|check, delay| check.transaction_id == "test" && delay.is_some())
376            .times(1)
377            .returning(|_, _| Box::pin(async { Ok(()) }));
378
379        let handler = SolanaRelayerTransaction::new(
380            create_mock_solana_relayer("test-relayer".to_string(), false),
381            relayer_repo,
382            Arc::new(provider),
383            tx_repo,
384            Arc::new(job_producer),
385        )?;
386
387        let result = handler.handle_transaction_status_impl(tx.clone()).await;
388
389        assert!(result.is_ok());
390        let updated_tx = result.unwrap();
391        assert_eq!(updated_tx.id, tx.id);
392        assert_eq!(updated_tx.status, TransactionStatus::Pending); // Status should not change
393        Ok(())
394    }
395
396    #[tokio::test]
397    async fn test_handle_status_confirmed() -> Result<()> {
398        let mut provider = MockSolanaProviderTrait::new();
399        let relayer_repo = Arc::new(MockRelayerRepository::new());
400        let mut tx_repo = MockTransactionRepository::new();
401        let mut job_producer = MockJobProducerTrait::new();
402
403        let signature_str =
404            "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
405        let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
406
407        provider
408            .expect_get_transaction_status()
409            .with(eq(Signature::from_str(signature_str)?))
410            .times(1)
411            .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Confirmed) }));
412
413        job_producer
414            .expect_produce_check_transaction_status_job()
415            .withf(|check, delay| check.transaction_id == "test" && delay.is_some())
416            .times(1)
417            .returning(|_, _| Box::pin(async { Ok(()) }));
418
419        let tx_id = tx.id.clone();
420
421        tx_repo
422            .expect_partial_update()
423            .withf(move |tx_id_param, update_req| {
424                tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Mined)
425            })
426            .times(1)
427            .returning(move |_, _| {
428                Ok(create_tx_with_signature(
429                    TransactionStatus::Mined,
430                    Some(signature_str),
431                ))
432            });
433
434        let handler = SolanaRelayerTransaction::new(
435            create_mock_solana_relayer("test-relayer".to_string(), false),
436            relayer_repo,
437            Arc::new(provider),
438            Arc::new(tx_repo),
439            Arc::new(job_producer),
440        )?;
441
442        let result = handler.handle_transaction_status_impl(tx.clone()).await;
443
444        assert!(result.is_ok());
445        let updated_tx = result.unwrap();
446        assert_eq!(updated_tx.id, tx.id);
447        assert_eq!(updated_tx.status, TransactionStatus::Mined);
448        Ok(())
449    }
450
451    #[tokio::test]
452    async fn test_handle_status_finalized() -> Result<()> {
453        let mut provider = MockSolanaProviderTrait::new();
454        let relayer_repo = Arc::new(MockRelayerRepository::new());
455        let mut tx_repo = MockTransactionRepository::new();
456        let job_producer = MockJobProducerTrait::new();
457
458        let signature_str =
459            "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
460        let tx = create_tx_with_signature(TransactionStatus::Mined, Some(signature_str));
461
462        provider
463            .expect_get_transaction_status()
464            .with(eq(Signature::from_str(signature_str)?))
465            .times(1)
466            .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Finalized) }));
467
468        let tx_id = tx.id.clone();
469
470        tx_repo
471            .expect_partial_update()
472            .withf(move |tx_id_param, update_req| {
473                tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Confirmed)
474            })
475            .times(1)
476            .returning(move |_, _| {
477                Ok(create_tx_with_signature(
478                    TransactionStatus::Confirmed,
479                    Some(signature_str),
480                ))
481            });
482
483        let handler = SolanaRelayerTransaction::new(
484            create_mock_solana_relayer("test-relayer".to_string(), false),
485            relayer_repo,
486            Arc::new(provider),
487            Arc::new(tx_repo),
488            Arc::new(job_producer),
489        )?;
490
491        let result = handler.handle_transaction_status_impl(tx.clone()).await;
492
493        assert!(result.is_ok());
494        let updated_tx = result.unwrap();
495        assert_eq!(updated_tx.id, tx.id);
496        assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
497        Ok(())
498    }
499
500    #[tokio::test]
501    async fn test_handle_status_provider_error() -> Result<()> {
502        let mut provider = MockSolanaProviderTrait::new();
503        let relayer_repo = Arc::new(MockRelayerRepository::new());
504        let tx_repo = Arc::new(MockTransactionRepository::new());
505        let mut job_producer = MockJobProducerTrait::new();
506
507        let signature_str = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
508        let tx = create_tx_with_signature(TransactionStatus::Pending, Some(signature_str));
509        let error_message = "Provider is down";
510
511        provider
512            .expect_get_transaction_status()
513            .with(eq(Signature::from_str(signature_str)?))
514            .times(1)
515            .returning(move |_| {
516                Box::pin(async { Err(SolanaProviderError::RpcError(error_message.to_string())) })
517            });
518
519        job_producer
520            .expect_produce_check_transaction_status_job()
521            .withf(|check, delay| check.transaction_id == "test" && delay.is_some())
522            .times(1)
523            .returning(|_, _| Box::pin(async { Ok(()) }));
524
525        let handler = SolanaRelayerTransaction::new(
526            create_mock_solana_relayer("test-relayer".to_string(), false),
527            relayer_repo,
528            Arc::new(provider),
529            tx_repo,
530            Arc::new(job_producer),
531        )?;
532
533        let result = handler.handle_transaction_status_impl(tx.clone()).await;
534
535        assert!(result.is_err());
536        let err = result.unwrap_err();
537        assert!(matches!(err, TransactionError::UnexpectedError(_)));
538        Ok(())
539    }
540
541    #[tokio::test]
542    async fn test_handle_status_failed() -> Result<()> {
543        let mut provider = MockSolanaProviderTrait::new();
544        let relayer_repo = Arc::new(MockRelayerRepository::new());
545        let mut tx_repo = MockTransactionRepository::new();
546        let job_producer = MockJobProducerTrait::new();
547
548        let signature_str =
549            "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
550        let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
551
552        provider
553            .expect_get_transaction_status()
554            .with(eq(Signature::from_str(signature_str)?))
555            .times(1)
556            .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Failed) }));
557
558        let tx_id = tx.id.clone();
559
560        tx_repo
561            .expect_partial_update()
562            .withf(move |tx_id_param, update_req| {
563                tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Failed)
564            })
565            .times(1)
566            .returning(move |_, _| {
567                Ok(create_tx_with_signature(
568                    TransactionStatus::Failed,
569                    Some(signature_str),
570                ))
571            });
572
573        let handler = SolanaRelayerTransaction::new(
574            create_mock_solana_relayer("test-relayer".to_string(), false),
575            relayer_repo,
576            Arc::new(provider),
577            Arc::new(tx_repo),
578            Arc::new(job_producer),
579        )?;
580
581        let result = handler.handle_transaction_status_impl(tx.clone()).await;
582
583        assert!(result.is_ok());
584        let updated_tx = result.unwrap();
585        assert_eq!(updated_tx.id, tx.id);
586        assert_eq!(updated_tx.status, TransactionStatus::Failed);
587        Ok(())
588    }
589}