Code Examples

Practical implementation examples for integrating AS2aaS into your applications. These examples demonstrate common integration patterns and best practices.

Complete Workflow Implementation

Node.js Integration

const axios = require('axios');

class AS2Client {
  constructor(apiKey, baseURL = 'https://api.as2aas.com/v1') {
    this.apiKey = apiKey;
    this.baseURL = baseURL;
    this.client = axios.create({
      baseURL: this.baseURL,
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      }
    });
  }

  async createPartner(partnerData) {
    try {
      const response = await this.client.post('/partners', partnerData, {
        headers: {
          'Idempotency-Key': `partner_${Date.now()}`
        }
      });
      return response.data;
    } catch (error) {
      throw new Error(`Failed to create partner: ${error.response.data.error.message}`);
    }
  }

  async sendMessage(messageData) {
    try {
      const response = await this.client.post('/messages', messageData, {
        headers: {
          'Idempotency-Key': `msg_${Date.now()}`
        }
      });
      return response.data;
    } catch (error) {
      throw new Error(`Failed to send message: ${error.response.data.error.message}`);
    }
  }

  async getMessageStatus(messageId) {
    try {
      const response = await this.client.get(`/messages/${messageId}`);
      return response.data;
    } catch (error) {
      throw new Error(`Failed to get message status: ${error.response.data.error.message}`);
    }
  }
}

// Usage example
const client = new AS2Client(process.env.AS2AAS_API_KEY);

async function processEDIInvoice() {
  // Create trading partner
  const partner = await client.createPartner({
    as2_id: 'ENTERPRISE_CUSTOMER',
    name: 'Enterprise Customer Corp',
    url: 'https://customer.enterprise.com/as2',
    email: '[email protected]',
    compression: true,
    mdn_mode: 'async'
  });

  // Send EDI invoice
  const message = await client.sendMessage({
    partner_id: partner.id,
    subject: 'Invoice Document',
    payload: {
      content: 'ISA*00*...*IEA*1*000000001~',
      filename: 'invoice_001.edi'
    },
    content_type: 'application/edi-x12'
  });

  console.log(`Message sent: ${message.id}`);
  
  // Monitor message status
  const status = await client.getMessageStatus(message.id);
  console.log(`Message status: ${status.status}`);
}

Python Integration

import requests
import json
import time
from typing import Dict, Any, Optional

class AS2Client:
    def __init__(self, api_key: str, base_url: str = 'https://api.as2aas.com/v1'):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {api_key}',
            'Content-Type': 'application/json'
        })

    def create_partner(self, partner_data: Dict[str, Any]) -> Dict[str, Any]:
        """Create a new trading partner"""
        response = self.session.post(
            f'{self.base_url}/partners',
            json=partner_data,
            headers={'Idempotency-Key': f'partner_{int(time.time())}'}
        )
        response.raise_for_status()
        return response.json()

    def send_message(self, message_data: Dict[str, Any]) -> Dict[str, Any]:
        """Send AS2 message to trading partner"""
        response = self.session.post(
            f'{self.base_url}/messages',
            json=message_data,
            headers={'Idempotency-Key': f'msg_{int(time.time())}'}
        )
        response.raise_for_status()
        return response.json()

    def get_message_status(self, message_id: str) -> Dict[str, Any]:
        """Retrieve message status and details"""
        response = self.session.get(f'{self.base_url}/messages/{message_id}')
        response.raise_for_status()
        return response.json()

    def wait_for_delivery(self, message_id: str, timeout: int = 300) -> Dict[str, Any]:
        """Wait for message delivery with timeout"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            status = self.get_message_status(message_id)
            
            if status['status'] in ['delivered', 'failed']:
                return status
                
            time.sleep(5)
        
        raise TimeoutError(f'Message {message_id} did not reach final status within {timeout} seconds')

# Usage example
client = AS2Client(os.environ['AS2AAS_API_KEY'])

def process_pharmaceutical_shipment():
    """Example: EPCIS pharmaceutical shipment processing"""
    
    # Create pharmaceutical partner
    partner = client.create_partner({
        'as2_id': 'PHARMA_DISTRIBUTOR',
        'name': 'Pharmaceutical Distributor LLC',
        'url': 'https://pharma.distributor.com/as2',
        'email': '[email protected]',
        'compression': True,
        'mdn_mode': 'async',
        'encryption_cert_id': 'cert_pharma_enc'
    })

    # Send EPCIS shipment document
    epcis_content = """<?xml version="1.0" encoding="UTF-8"?>
    <epcis:EPCISDocument xmlns:epcis="urn:epcglobal:epcis:xsd:1">
      <EPCISBody>
        <EventList>
          <ObjectEvent>
            <eventTime>2024-01-15T10:30:00Z</eventTime>
            <epcList>
              <epc>urn:epc:id:sgtin:0614141.812345.6789</epc>
            </epcList>
            <action>OBSERVE</action>
            <bizStep>urn:epcglobal:cbv:bizstep:shipping</bizStep>
          </ObjectEvent>
        </EventList>
      </EPCISBody>
    </epcis:EPCISDocument>"""

    message = client.send_message({
        'partner_id': partner['id'],
        'subject': 'DSCSA Shipment Verification',
        'payload': {
            'content': epcis_content,
            'filename': 'shipment_verification.xml'
        },
        'content_type': 'application/xml',
        'headers': {
            'DSCSA-Version': '3.0',
            'Event-Type': 'Shipment'
        }
    })

    # Wait for delivery confirmation
    final_status = client.wait_for_delivery(message['id'])
    print(f"Shipment notification status: {final_status['status']}")

PHP Integration

<?php

class AS2Client 
{
    private $apiKey;
    private $baseUrl;
    private $httpClient;

    public function __construct(string $apiKey, string $baseUrl = 'https://api.as2aas.com/v1')
    {
        $this->apiKey = $apiKey;
        $this->baseUrl = $baseUrl;
        $this->httpClient = new \GuzzleHttp\Client([
            'base_uri' => $this->baseUrl,
            'headers' => [
                'Authorization' => 'Bearer ' . $this->apiKey,
                'Content-Type' => 'application/json'
            ]
        ]);
    }

    public function createPartner(array $partnerData): array
    {
        $response = $this->httpClient->post('/partners', [
            'json' => $partnerData,
            'headers' => [
                'Idempotency-Key' => 'partner_' . time()
            ]
        ]);

        return json_decode($response->getBody(), true);
    }

    public function sendMessage(array $messageData): array
    {
        $response = $this->httpClient->post('/messages', [
            'json' => $messageData,
            'headers' => [
                'Idempotency-Key' => 'msg_' . time()
            ]
        ]);

        return json_decode($response->getBody(), true);
    }

    public function getMessageStatus(string $messageId): array
    {
        $response = $this->httpClient->get("/messages/{$messageId}");
        return json_decode($response->getBody(), true);
    }
}

// Usage example
$client = new AS2Client($_ENV['AS2AAS_API_KEY']);

function processRetailEDI() {
    global $client;
    
    // Create retail partner
    $partner = $client->createPartner([
        'as2_id' => 'RETAIL_CHAIN',
        'name' => 'National Retail Chain',
        'url' => 'https://edi.retailchain.com/as2',
        'email' => '[email protected]',
        'compression' => true,
        'mdn_mode' => 'sync'
    ]);

    // Send EDI 850 (Purchase Order)
    $message = $client->sendMessage([
        'partner_id' => $partner['id'],
        'subject' => 'Purchase Order',
        'payload' => [
            'content' => 'ISA*00*...*850*...*IEA*1*000000001~',
            'filename' => 'purchase_order.edi'
        ],
        'content_type' => 'application/edi-x12',
        'headers' => [
            'Document-Type' => 'PurchaseOrder',
            'PO-Number' => 'PO-2024-001'
        ]
    ]);

    echo "Purchase order sent: {$message['id']}\n";
}

Webhook Implementation

Express.js Webhook Handler

const express = require('express');
const crypto = require('crypto');
const app = express();

app.use(express.raw({ type: 'application/json' }));

function verifyWebhookSignature(payload, signature, secret) {
  const hmac = crypto.createHmac('sha256', secret);
  hmac.update(payload, 'utf8');
  const digest = 'sha256=' + hmac.digest('hex');
  
  return crypto.timingSafeEqual(
    Buffer.from(signature),
    Buffer.from(digest)
  );
}

app.post('/webhooks/as2', (req, res) => {
  const signature = req.headers['as2aas-signature'];
  const payload = req.body;
  const secret = process.env.WEBHOOK_SECRET;
  
  // Verify webhook signature
  if (!verifyWebhookSignature(payload, signature, secret)) {
    return res.status(401).json({ error: 'Invalid signature' });
  }
  
  const event = JSON.parse(payload);
  
  // Process different event types
  switch (event.type) {
    case 'message.delivered':
      handleMessageDelivered(event.data);
      break;
      
    case 'message.failed':
      handleMessageFailed(event.data);
      break;
      
    case 'certificate.expires_soon':
      handleCertificateExpiry(event.data);
      break;
      
    default:
      console.log(`Unhandled event type: ${event.type}`);
  }
  
  res.status(200).json({ received: true });
});

function handleMessageDelivered(messageData) {
  console.log(`Message ${messageData.id} delivered successfully`);
  
  // Update internal systems
  updateOrderStatus(messageData.id, 'delivered');
  
  // Notify relevant stakeholders
  notifyDeliverySuccess(messageData);
}

function handleMessageFailed(messageData) {
  console.error(`Message ${messageData.id} failed: ${messageData.error}`);
  
  // Log failure for investigation
  logMessageFailure(messageData);
  
  // Alert operations team
  alertOperationsTeam(messageData);
}

function handleCertificateExpiry(certData) {
  console.warn(`Certificate ${certData.name} expires in ${certData.days_until_expiry} days`);
  
  // Alert security team
  alertSecurityTeam(certData);
}

app.listen(3000, () => {
  console.log('AS2 webhook server running on port 3000');
});

Enterprise Integration Patterns

High-Availability Setup

import asyncio
import aiohttp
from typing import List, Dict

class EnterpriseAS2Client:
    def __init__(self, api_keys: List[str], base_url: str = 'https://api.as2aas.com/v1'):
        self.api_keys = api_keys
        self.base_url = base_url
        self.current_key_index = 0

    async def send_message_with_failover(self, message_data: Dict) -> Dict:
        """Send message with automatic API key failover"""
        
        for attempt in range(len(self.api_keys)):
            api_key = self.api_keys[self.current_key_index]
            
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.post(
                        f'{self.base_url}/messages',
                        json=message_data,
                        headers={
                            'Authorization': f'Bearer {api_key}',
                            'Content-Type': 'application/json',
                            'Idempotency-Key': f'msg_{message_data.get("id", int(time.time()))}'
                        }
                    ) as response:
                        if response.status == 201:
                            return await response.json()
                        elif response.status == 429:
                            # Rate limited - try next key
                            self.current_key_index = (self.current_key_index + 1) % len(self.api_keys)
                            await asyncio.sleep(1)
                            continue
                        else:
                            response.raise_for_status()
                            
            except Exception as e:
                if attempt == len(self.api_keys) - 1:
                    raise
                self.current_key_index = (self.current_key_index + 1) % len(self.api_keys)
                await asyncio.sleep(2 ** attempt)

# Usage
client = EnterpriseAS2Client([
    os.environ['AS2AAS_PRIMARY_KEY'],
    os.environ['AS2AAS_SECONDARY_KEY']
])

Batch Processing

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.Map;

public class AS2BatchProcessor {
    private final String apiKey;
    private final String baseUrl;
    private final ExecutorService executor;
    
    public AS2BatchProcessor(String apiKey) {
        this.apiKey = apiKey;
        this.baseUrl = "https://api.as2aas.com/v1";
        this.executor = Executors.newFixedThreadPool(10);
    }
    
    public CompletableFuture<Map<String, Object>> sendMessageAsync(Map<String, Object> messageData) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(baseUrl + "/messages"))
                    .header("Authorization", "Bearer " + apiKey)
                    .header("Content-Type", "application/json")
                    .header("Idempotency-Key", "msg_" + System.currentTimeMillis())
                    .POST(HttpRequest.BodyPublishers.ofString(
                        new ObjectMapper().writeValueAsString(messageData)
                    ))
                    .build();
                    
                HttpResponse<String> response = HttpClient.newHttpClient()
                    .send(request, HttpResponse.BodyHandlers.ofString());
                    
                if (response.statusCode() == 201) {
                    return new ObjectMapper().readValue(response.body(), Map.class);
                } else {
                    throw new RuntimeException("Failed to send message: " + response.body());
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, executor);
    }
    
    public void processBatch(List<Map<String, Object>> messages) {
        List<CompletableFuture<Map<String, Object>>> futures = messages.stream()
            .map(this::sendMessageAsync)
            .collect(Collectors.toList());
            
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenRun(() -> {
                System.out.println("Batch processing completed");
                futures.forEach(future -> {
                    try {
                        Map<String, Object> result = future.get();
                        System.out.println("Message sent: " + result.get("id"));
                    } catch (Exception e) {
                        System.err.println("Message failed: " + e.getMessage());
                    }
                });
            });
    }
}

Industry-Specific Examples

Healthcare Integration

def send_dscsa_verification(client, partner_id, product_sgtin):
    """Send DSCSA Track & Trace verification request"""
    
    epcis_document = f"""<?xml version="1.0" encoding="UTF-8"?>
    <epcis:EPCISDocument xmlns:epcis="urn:epcglobal:epcis:xsd:1">
      <EPCISBody>
        <EventList>
          <ObjectEvent>
            <eventTime>{datetime.utcnow().isoformat()}Z</eventTime>
            <epcList>
              <epc>urn:epc:id:sgtin:{product_sgtin}</epc>
            </epcList>
            <action>OBSERVE</action>
            <bizStep>urn:epcglobal:cbv:bizstep:verification</bizStep>
            <disposition>urn:epcglobal:cbv:disp:needs_verification</disposition>
          </ObjectEvent>
        </EventList>
      </EPCISBody>
    </epcis:EPCISDocument>"""

    return client.send_message({
        'partner_id': partner_id,
        'subject': 'DSCSA Verification Request',
        'payload': {
            'content': epcis_document,
            'filename': f'verification_{product_sgtin}.xml'
        },
        'content_type': 'application/xml',
        'headers': {
            'DSCSA-Version': '3.0',
            'Event-Type': 'VerificationRequest',
            'Product-SGTIN': product_sgtin
        }
    })

Manufacturing Integration

async function sendProductionSchedule(client, partnerId, scheduleData) {
  // Convert internal schedule format to EDI 830
  const ediContent = convertToEDI830(scheduleData);
  
  const message = await client.sendMessage({
    partner_id: partnerId,
    subject: 'Production Schedule Update',
    payload: {
      content: ediContent,
      filename: `schedule_${scheduleData.week}.edi`
    },
    content_type: 'application/edi-x12',
    headers: {
      'Document-Type': '830',
      'Schedule-Week': scheduleData.week,
      'Plant-Code': scheduleData.plantCode
    }
  });

  return message;
}

function convertToEDI830(scheduleData) {
  // Implementation specific to your ERP system
  return `ISA*00*...*830*...`;
}

Error Handling Patterns

Comprehensive Error Handling

class AS2ErrorHandler {
  static async handleAPIError(error, operation, retryCallback = null) {
    const errorData = error.response?.data?.error;
    
    switch (errorData?.code) {
      case 'rate_limit_exceeded':
        const retryAfter = error.response.headers['retry-after'] || 60;
        console.log(`Rate limited. Retrying after ${retryAfter} seconds`);
        
        if (retryCallback) {
          setTimeout(retryCallback, retryAfter * 1000);
        }
        break;
        
      case 'invalid_partner':
        console.error(`Partner validation failed for operation: ${operation}`);
        // Handle partner configuration issues
        break;
        
      case 'certificate_expired':
        console.error(`Certificate expired during operation: ${operation}`);
        // Alert certificate management team
        break;
        
      case 'transmission_timeout':
        console.error(`Partner endpoint timeout during operation: ${operation}`);
        // Implement retry with exponential backoff
        break;
        
      default:
        console.error(`Unexpected error during ${operation}:`, errorData);
        // Log for investigation
    }
  }
}

Testing and Development

Test Environment Setup

#!/bin/bash

# Set test environment variables
export AS2AAS_API_KEY="pk_test_your_test_key"
export AS2AAS_BASE_URL="https://api.as2aas.com/v1"

# Create test partner
TEST_PARTNER=$(curl -s -X POST "$AS2AAS_BASE_URL/partners" \
  -H "Authorization: Bearer $AS2AAS_API_KEY" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: test_partner_$(date +%s)" \
  -d '{
    "as2_id": "TEST_PARTNER",
    "name": "Test Trading Partner", 
    "url": "https://demo.as2aas.com/as2/receive",
    "email": "[email protected]",
    "compression": true,
    "mdn_mode": "sync"
  }')

PARTNER_ID=$(echo $TEST_PARTNER | jq -r '.id')
echo "Created test partner: $PARTNER_ID"

# Send test message
TEST_MESSAGE=$(curl -s -X POST "$AS2AAS_BASE_URL/messages" \
  -H "Authorization: Bearer $AS2AAS_API_KEY" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: test_msg_$(date +%s)" \
  -d "{
    \"partner_id\": \"$PARTNER_ID\",
    \"subject\": \"Test Message\",
    \"payload\": {
      \"content\": \"Test message content\",
      \"filename\": \"test.txt\"
    },
    \"content_type\": \"text/plain\"
  }")

MESSAGE_ID=$(echo $TEST_MESSAGE | jq -r '.id')
echo "Sent test message: $MESSAGE_ID"

# Monitor message status
sleep 5
MESSAGE_STATUS=$(curl -s -X GET "$AS2AAS_BASE_URL/messages/$MESSAGE_ID" \
  -H "Authorization: Bearer $AS2AAS_API_KEY")

echo "Message status: $(echo $MESSAGE_STATUS | jq -r '.status')"

Production Deployment

Environment Configuration

# docker-compose.yml
version: '3.8'
services:
  as2-integration:
    image: your-app:latest
    environment:
      - AS2AAS_API_KEY=${AS2AAS_PRODUCTION_KEY}
      - AS2AAS_WEBHOOK_SECRET=${WEBHOOK_SECRET}
      - LOG_LEVEL=info
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3

Monitoring Integration

import logging
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge

# Metrics for monitoring
message_counter = Counter('as2_messages_total', 'Total AS2 messages', ['status', 'partner'])
message_duration = Histogram('as2_message_duration_seconds', 'Message processing duration')
active_partners = Gauge('as2_active_partners', 'Number of active trading partners')

def send_monitored_message(client, message_data):
    """Send message with comprehensive monitoring"""
    
    start_time = time.time()
    partner_id = message_data.get('partner_id', 'unknown')
    
    try:
        message = client.send_message(message_data)
        
        # Wait for final status
        final_status = client.wait_for_delivery(message['id'])
        
        # Record metrics
        message_counter.labels(status=final_status['status'], partner=partner_id).inc()
        message_duration.observe(time.time() - start_time)
        
        logging.info(f"Message {message['id']} processed successfully")
        return message
        
    except Exception as e:
        message_counter.labels(status='error', partner=partner_id).inc()
        logging.error(f"Message processing failed: {str(e)}")
        raise