Skip to main content

Upload Architecture

Synapse SDK Integration

SDK Initialization

// Synapse SDK initialization in backend/rawchunks/synapseSDK.js
async function createSynapseInstance() {
  if (globalSynapseInstance && paymentsInitialized) {
    return globalSynapseInstance;
  }
  
  const { Synapse, TOKENS } = await initializeSynapseSDK();
  
  try {
    console.log('🚀 Initializing Synapse SDK for Calibration network...');
    
    // Create Synapse instance
    globalSynapseInstance = await Synapse.create({
      privateKey: FILECOIN_PRIVATE_KEY,
      rpcURL: FILECOIN_RPC_URL,
      withCDN: true // Enable CDN for VibesFlow
    });
  
    console.log(`✅ Synapse instance created for wallet: ${FILECOIN_ADDRESS}`);
    
    // Setup payments if not already done
    if (!paymentsInitialized) {
      await setupPayments(globalSynapseInstance);
      paymentsInitialized = true;
    }
    
    return globalSynapseInstance;
    
  } catch (error) {
    console.error('❌ Failed to create Synapse instance:', error);
    throw error;
  }
}

Payment Configuration

// Automated USDFC payment setup
async function setupPayments(synapse) {
  try {
    console.log('💳 Setting up payments and service approvals...');
    
    const { TOKENS, CONTRACT_ADDRESSES } = await initializeSynapseSDK();
    
    // Check current balances
    const filBalance = await synapse.payments.walletBalance(); // FIL balance
    const usdfcBalance = await synapse.payments.walletBalance(TOKENS.USDFC); // USDFC balance
    const contractBalance = await synapse.payments.balance(TOKENS.USDFC); // Contract balance
    
    console.log('📊 Current Balances:', {
      FIL: ethers.formatEther(filBalance),
      USDFC_Wallet: ethers.formatEther(usdfcBalance),
      USDFC_Contract: ethers.formatEther(contractBalance)
    });
    
    // Minimum required amounts (from fs-upload-dapp patterns)
    const minDeposit = ethers.parseEther('50'); // 50 USDFC minimum
    const rateAllowance = ethers.parseEther('10'); // 10 USDFC per epoch
    const lockupAllowance = ethers.parseEther('1000'); // 1000 USDFC lockup
    
    // Deposit USDFC if contract balance is low
    if (contractBalance < minDeposit) {
      const depositAmount = minDeposit - contractBalance;
      console.log(`💰 Depositing ${ethers.formatEther(depositAmount)} USDFC...`);
      
      const depositTx = await synapse.payments.deposit(depositAmount, TOKENS.USDFC);
      console.log(`📝 Deposit transaction: ${depositTx.hash}`);
      await depositTx.wait();
      console.log('✅ Deposit confirmed');
    }
    
    // Approve Pandora service
    const network = synapse.getNetwork();
    const pandoraAddress = CONTRACT_ADDRESSES.PANDORA_SERVICE[network];
    
    const approveTx = await synapse.payments.approveService(
      pandoraAddress,
      rateAllowance,
      lockupAllowance,
      TOKENS.USDFC
    );
    
    console.log(`📝 Service approval transaction: ${approveTx.hash}`);
    await approveTx.wait();
    console.log('✅ Pandora service approved');
    
  } catch (error) {
    console.error('❌ Payment setup failed:', error);
    throw error;
  }
}

Storage Service Management

Per-RTA Storage Services

// Create or get storage service for specific RTA
async function getStorageServiceForRTA(rtaId, synapse, creator = null) {
  try {
    // Check session cache first
    if (sessionStorageServices.has(rtaId)) {
      console.log(`♻️ Reusing cached storage service for RTA: ${rtaId}`);
      return sessionStorageServices.get(rtaId);
    }
    
    // Check if we have existing RTA metadata in DynamoDB
    const existingRTA = await persistenceService.getRTAMetadata(rtaId);
    
    console.log(`🔧 Creating new storage service for RTA: ${rtaId}...`);
    
    // Create storage service with CDN enabled
    const storageService = await synapse.createStorage({
      withCDN: true,
      callbacks: {
        onProviderSelected: (provider) => {
          console.log(`✅ Provider selected for ${rtaId}:`, {
            address: provider.owner,
            pdpUrl: provider.pdpUrl
          });
        },
        onProofSetResolved: (info) => {
          const status = info.isExisting ? 'existing' : 'new';
          console.log(`📋 Proof set ${status} for ${rtaId}: ID ${info.proofSetId}`);
        },
        onProofSetCreationStarted: (transaction, statusUrl) => {
          console.log(`🚀 Proof set creation started for ${rtaId}: ${transaction.hash}`);
        },
        onProofSetCreationProgress: (status) => {
          const elapsed = Math.round(status.elapsedMs / 1000);
          console.log(`⏳ Proof set creation progress [${elapsed}s]: mined=${status.transactionMined}, live=${status.proofSetLive}`);
        }
      }
    });
    
    console.log(`✅ Storage service created for RTA: ${rtaId}`, {
      proofSetId: storageService.proofSetId,
      provider: storageService.storageProvider
    });
    
    // Cache the service for this session
    sessionStorageServices.set(rtaId, storageService);
    
    return storageService;
    
  } catch (error) {
    console.error(`❌ Failed to create storage service for RTA ${rtaId}:`, error);
    throw error;
  }
}

Proof Set Callbacks

// Comprehensive callback handling for storage operations
const storageService = await synapse.createStorage({
  withCDN: true,
  callbacks: {
    onProviderSelected: (provider) => {
      console.log(`✅ Provider selected for ${rtaId}:`, {
        address: provider.owner,
        pdpUrl: provider.pdpUrl
      });
    },
    onProofSetResolved: (info) => {
      const status = info.isExisting ? 'existing' : 'new';
      console.log(`📋 Proof set ${status} for ${rtaId}: ID ${info.proofSetId}`);
    },
    onProofSetCreationStarted: (transaction, statusUrl) => {
      console.log(`🚀 Proof set creation started for ${rtaId}: ${transaction.hash}`);
    },
    onProofSetCreationProgress: (status) => {
      const elapsed = Math.round(status.elapsedMs / 1000);
      console.log(`⏳ Progress [${elapsed}s]: mined=${status.transactionMined}, live=${status.proofSetLive}`);
    }
  }
});

Chunk Upload Process

Upload Implementation

// Complete chunk upload with status tracking
async function uploadChunkToFilecoin(chunkData, rtaId, chunkId, metadata) {
  console.log(`🚀 Starting Synapse upload for chunk ${chunkId} (${(chunkData.length / 1024).toFixed(1)}KB)`);
  
  try {
    // Initialize Synapse instance
    const synapse = await createSynapseInstance();
    
    // Get wallet and service status for frontend logging
    const { TOKENS } = await initializeSynapseSDK();
    const filBalance = await synapse.payments.walletBalance();
    const usdfcBalance = await synapse.payments.walletBalance(TOKENS.USDFC);
    const contractBalance = await synapse.payments.balance(TOKENS.USDFC);
    
    const walletStatus = {
      FIL: ethers.formatEther(filBalance),
      USDFC_Wallet: ethers.formatEther(usdfcBalance),
      USDFC_Contract: ethers.formatEther(contractBalance),
      sufficientFunds: contractBalance > ethers.parseEther('10')
    };
    
    // Check Pandora service approval
    const network = synapse.getNetwork();
    const { CONTRACT_ADDRESSES } = await initializeSynapseSDK();
    const pandoraAddress = CONTRACT_ADDRESSES.PANDORA_SERVICE[network];
    const serviceApproval = await synapse.payments.serviceApproval(pandoraAddress, TOKENS.USDFC);
    
    const pandoraStatus = {
      address: pandoraAddress,
      isApproved: serviceApproval.isApproved,
      rateAllowance: ethers.formatEther(serviceApproval.rateAllowance),
      lockupAllowance: ethers.formatEther(serviceApproval.lockupAllowance)
    };
    
    // Get or create storage service for this RTA
    const storageService = await getStorageServiceForRTA(rtaId, synapse, metadata.creator);
    
    // Run preflight check
    console.log(`🔍 Running preflight check for chunk ${chunkId}...`);
    const preflight = await storageService.preflightUpload(chunkData.length);
    
    if (!preflight.allowanceCheck.sufficient) {
      throw new Error(`Insufficient allowance for upload: ${preflight.allowanceCheck.message}`);
    }
    
    console.log(`💰 Estimated cost: ${ethers.formatEther(preflight.estimatedCost.perMonth)} USDFC/month`);
    
    // Upload chunk with callback handling
    let finalRootId = null;
    let uploadComplete = false;
    let rootConfirmed = false;
    
    const uploadResult = await storageService.upload(chunkData, {
      onUploadComplete: (commp) => {
        console.log(`✅ Upload complete for ${chunkId}: ${commp.toString()}`);
        uploadComplete = true;
      },
      onRootAdded: (transaction) => {
        if (transaction) {
          console.log(`📝 Root added transaction for ${chunkId}: ${transaction.hash}`);
        }
      },
      onRootConfirmed: (rootIds) => {
        console.log(`🎯 Root confirmed for ${chunkId}: IDs ${rootIds.join(', ')}`);
        if (rootIds && rootIds.length > 0) {
          finalRootId = rootIds[0];
          rootConfirmed = true;
        }
      }
    });
    
    // Wait for callbacks to complete
    await new Promise(resolve => setTimeout(resolve, 1000));
    
    const effectiveRootId = finalRootId || uploadResult.rootId;
    
    return {
      success: true,
      cid: uploadResult.commp.toString(),
      size: uploadResult.size,
      rootId: effectiveRootId,
      chunkId: chunkId,
      rtaId: rtaId,
      provider: 'synapse-filecoin',
      confirmed: rootConfirmed,
      // Status for frontend
      uploadId: `${rtaId}_${chunkId}`,
      walletStatus: walletStatus,
      pandoraStatus: pandoraStatus,
      proofSetId: storageService.proofSetId
    };
    
  } catch (error) {
    console.error(`❌ Synapse upload failed for chunk ${chunkId}:`, error.message);
    
    // Fallback to Pinata
    console.log(`🔄 Falling back to Pinata for chunk ${chunkId}...`);
    return await uploadToPinataFallback(chunkData, chunkId, metadata);
  }
}

Upload Status Tracking

// Track upload progress and root confirmations
const uploadResult = await storageService.upload(chunkData, {
  onUploadComplete: (commp) => {
    console.log(`✅ Upload complete for ${chunkId}: ${commp.toString()}`);
    uploadComplete = true;
  },
  onRootAdded: (transaction) => {
    if (transaction) {
      console.log(`📝 Root added transaction for ${chunkId}: ${transaction.hash}`);
    } else {
      console.log(`📝 Root added for ${chunkId} (legacy server)`);
    }
    rootAdded = true;
  },
  onRootConfirmed: (rootIds) => {
    console.log(`🎯 Root confirmed for ${chunkId}: IDs ${rootIds.join(', ')}`);
    if (rootIds && rootIds.length > 0) {
      finalRootId = rootIds[0]; // Store the confirmed root ID
      rootConfirmed = true;
    }
  }
});

Fallback System

Pinata IPFS Fallback

// Pinata fallback when Synapse fails
async function uploadToPinataFallback(chunkData, chunkId, metadata) {
  try {
    console.log(`📌 FALLBACK: Uploading chunk ${chunkId} to Pinata...`);
    
    if (!PINATA_JWT) {
      throw new Error('Pinata JWT not configured for fallback');
    }
    
    const formData = new FormData();
    formData.append('file', Buffer.from(chunkData), {
      filename: `${chunkId}.webm`,
      contentType: 'audio/webm'
    });
    
    const pinataMetadata = {
      name: `VibesFlow-Chunk-${chunkId}`,
      keyvalues: {
        rtaId: metadata.rtaId || 'unknown',
        chunkId: chunkId,
        type: 'audio-chunk',
        creator: metadata.creator || 'unknown',
        isFinal: metadata.isFinal ? 'true' : 'false'
      }
    };
    
    formData.append('pinataMetadata', JSON.stringify(pinataMetadata));
    formData.append('pinataOptions', JSON.stringify({ cidVersion: 1 }));
    
    const response = await fetch('https://api.pinata.cloud/pinning/pinFileToIPFS', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${PINATA_JWT}`,
        ...formData.getHeaders()
      },
      body: formData
    });
    
    if (!response.ok) {
      throw new Error(`Pinata fallback failed: ${response.statusText}`);
    }
    
    const result = await response.json();
    
    console.log(`✅ FALLBACK: Chunk ${chunkId} uploaded to Pinata: ${result.IpfsHash}`);
    
    return {
      success: true,
      cid: result.IpfsHash,
      size: chunkData.length,
      chunkId: chunkId,
      provider: 'pinata-fallback'
    };
    
  } catch (error) {
    console.error(`❌ Pinata fallback failed for chunk ${chunkId}:`, error);
    throw error;
  }
}

Metadata Compilation

RTA Metadata Assembly

// Compile final RTA metadata for FilCDN retrieval
async function compileRTAMetadata(rtaId, storageService = null) {
  try {
    console.log(`📋 Compiling metadata for RTA: ${rtaId}...`);
    
    // Get RTA data from DynamoDB
    const rtaData = await persistenceService.getRTAMetadata(rtaId);
    if (!rtaData) {
      console.error(`❌ No RTA data found for: ${rtaId}`);
      return null;
    }
  
    // Sort chunks by sequence
    const sortedChunks = (rtaData.chunks_detail || []).sort((a, b) => {
      const seqA = parseInt(a.chunk_id.split('_chunk_')[1]?.split('_')[0]) || 0;
      const seqB = parseInt(b.chunk_id.split('_chunk_')[1]?.split('_')[0]) || 0;
      return seqA - seqB;
    });
  
    // Calculate total duration
    const totalDuration = sortedChunks.reduce((sum, chunk) => sum + (chunk.duration || 60), 0);
    const formattedDuration = formatDuration(totalDuration);
  
    // Create FilCDN-compatible metadata structure
    const metadata = {
      rta_id: rtaId,
      creator: rtaData.creator,
      rta_duration: formattedDuration,
      chunks: sortedChunks.length,
      is_complete: true,
      
      // FilCDN integration
      filcdn_base: `https://${FILECOIN_ADDRESS}.calibration.filcdn.io/`,
      filcdn_wallet: FILECOIN_ADDRESS,
      network: 'calibration',
      
      // Synapse integration details
      proof_set_id: rtaData.proof_set_id,
      storage_provider: rtaData.storage_provider,
      
      // URLs for first and last chunks
      first_chunk_url: sortedChunks.length > 0 ? 
        `https://${FILECOIN_ADDRESS}.calibration.filcdn.io/${sortedChunks[0].cid}` : null,
      last_chunk_url: sortedChunks.length > 0 ? 
        `https://${FILECOIN_ADDRESS}.calibration.filcdn.io/${sortedChunks[sortedChunks.length - 1].cid}` : null,
      
      // Metadata
      upload_timestamp: rtaData.upload_timestamp,
      compilation_timestamp: Date.now(),
      total_size_mb: sortedChunks.reduce((sum, chunk) => sum + (chunk.size || 0), 0) / (1024 * 1024),
      
      // Detailed chunk information for playback
      chunks_detail: sortedChunks.map(chunk => ({
        chunk_id: chunk.chunk_id,
        cid: chunk.cid,
        size: chunk.size,
        root_id: chunk.root_id,
        duration: chunk.duration || 60,
        participants: chunk.participants || 1,
        owner: chunk.owner,
        filcdn_url: chunk.filcdn_url,
        sequence: parseInt(chunk.chunk_id.split('_chunk_')[1]?.split('_')[0]) || 0
      }))
    };
  
    console.log(`📊 Metadata compiled:`, {
      chunks: metadata.chunks,
      duration: metadata.rta_duration,
      totalSizeMB: metadata.total_size_mb.toFixed(2),
      proofSetId: metadata.proof_set_id
    });
    
    return metadata;
    
  } catch (error) {
    console.error(`❌ Failed to compile RTA metadata for ${rtaId}:`, error);
    throw error;
  }
}

Backend Processing Flow

Chunk Upload Handler

// Backend endpoint for chunk uploads in backend/rawchunks/app.js
app.post('/upload', async (req, res) => {
  try {
    const { chunkId, rtaId, audioData, metadata, isFinalChunk } = req.body;
    
    // Validate chunk size for Synapse SDK
    const chunkBuffer = Buffer.from(audioData, 'base64');
    if (chunkBuffer.length < 65) {
      return res.status(400).json({
        success: false,
        error: 'Chunk too small for Synapse SDK (minimum 65 bytes)'
      });
    }

    console.log(`📤 Processing chunk upload: ${chunkId} (${(chunkBuffer.length / 1024).toFixed(1)}KB)`);

    // Upload to Filecoin via Synapse SDK
    const uploadResult = await synapseSDK.queueChunkForFilecoin(rtaId, chunkId, chunkBuffer, {
      ...metadata,
      isFinal: isFinalChunk
    });

    // Return detailed status for frontend logging
    res.json({
      success: true,
      chunkId: chunkId,
      cid: uploadResult.cid,
      size: uploadResult.size,
      provider: uploadResult.provider,
      synapseStatus: {
        success: uploadResult.success,
        uploadId: uploadResult.uploadId,
        queuePosition: uploadResult.queuePosition,
        estimatedProcessTime: uploadResult.estimatedProcessTime,
        walletStatus: uploadResult.walletStatus,
        pandoraStatus: uploadResult.pandoraStatus,
        proofSetId: uploadResult.proofSetId,
        proofSetStatus: uploadResult.proofSetStatus,
        storageProvider: uploadResult.storageProvider
      }
    });

  } catch (error) {
    console.error('❌ Chunk upload failed:', error);
    res.status(500).json({
      success: false,
      error: error.message,
      synapseStatus: {
        success: false,
        message: error.message,
        error: error.message
      }
    });
  }
});

Status Polling

// Polling endpoint for upload status
app.get('/filecoin/status/:rtaId', async (req, res) => {
  try {
    const { rtaId } = req.params;
    const status = await synapseSDK.getFilecoinUploadStatus(rtaId);
    
    res.json(status);
  } catch (error) {
    console.error(`❌ Failed to get upload status for ${rtaId}:`, error);
    res.status(500).json({ error: error.message });
  }
});

Auto-Completion System

Fallback Completion

// Auto-completion for incomplete RTAs
setTimeout(async () => {
  try {
    const rtaData = await persistenceService.getRTAMetadata(rtaId);
    if (rtaData && !rtaData.is_complete) {
      const chunks = rtaData.chunks_detail || [];
      const validChunks = chunks.filter(chunk => chunk.cid && chunk.cid.length > 0);
      
      // If RTA has chunks but isn't complete, auto-complete it
      if (validChunks.length > 0) {
        console.log(`🔄 Auto-completion fallback triggered for RTA: ${rtaId} (${validChunks.length} chunks)`);
        const { forceCompleteRTA } = require('./helper');
        await forceCompleteRTA(rtaId);
      }
    }
  } catch (error) {
    console.warn(`⚠️ Auto-completion fallback failed for ${rtaId}:`, error.message);
  }
}, 360000); // 6 minutes delay

Next Steps