Code Examples
Practical implementation examples for integrating AS2aaS into your applications. These examples demonstrate common integration patterns and best practices.
Complete Workflow Implementation
Node.js Integration
const axios = require('axios');
class AS2Client {
constructor(apiKey, baseURL = 'https://api.as2aas.com/v1') {
this.apiKey = apiKey;
this.baseURL = baseURL;
this.client = axios.create({
baseURL: this.baseURL,
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
}
});
}
async createPartner(partnerData) {
try {
const response = await this.client.post('/partners', partnerData, {
headers: {
'Idempotency-Key': `partner_${Date.now()}`
}
});
return response.data;
} catch (error) {
throw new Error(`Failed to create partner: ${error.response.data.error.message}`);
}
}
async sendMessage(messageData) {
try {
const response = await this.client.post('/messages', messageData, {
headers: {
'Idempotency-Key': `msg_${Date.now()}`
}
});
return response.data;
} catch (error) {
throw new Error(`Failed to send message: ${error.response.data.error.message}`);
}
}
async getMessageStatus(messageId) {
try {
const response = await this.client.get(`/messages/${messageId}`);
return response.data;
} catch (error) {
throw new Error(`Failed to get message status: ${error.response.data.error.message}`);
}
}
}
// Usage example
const client = new AS2Client(process.env.AS2AAS_API_KEY);
async function processEDIInvoice() {
// Create trading partner
const partner = await client.createPartner({
as2_id: 'ENTERPRISE_CUSTOMER',
name: 'Enterprise Customer Corp',
url: 'https://customer.enterprise.com/as2',
email: '[email protected]',
compression: true,
mdn_mode: 'async'
});
// Send EDI invoice
const message = await client.sendMessage({
partner_id: partner.id,
subject: 'Invoice Document',
payload: {
content: 'ISA*00*...*IEA*1*000000001~',
filename: 'invoice_001.edi'
},
content_type: 'application/edi-x12'
});
console.log(`Message sent: ${message.id}`);
// Monitor message status
const status = await client.getMessageStatus(message.id);
console.log(`Message status: ${status.status}`);
}
Python Integration
import requests
import json
import time
from typing import Dict, Any, Optional
class AS2Client:
def __init__(self, api_key: str, base_url: str = 'https://api.as2aas.com/v1'):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
})
def create_partner(self, partner_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new trading partner"""
response = self.session.post(
f'{self.base_url}/partners',
json=partner_data,
headers={'Idempotency-Key': f'partner_{int(time.time())}'}
)
response.raise_for_status()
return response.json()
def send_message(self, message_data: Dict[str, Any]) -> Dict[str, Any]:
"""Send AS2 message to trading partner"""
response = self.session.post(
f'{self.base_url}/messages',
json=message_data,
headers={'Idempotency-Key': f'msg_{int(time.time())}'}
)
response.raise_for_status()
return response.json()
def get_message_status(self, message_id: str) -> Dict[str, Any]:
"""Retrieve message status and details"""
response = self.session.get(f'{self.base_url}/messages/{message_id}')
response.raise_for_status()
return response.json()
def wait_for_delivery(self, message_id: str, timeout: int = 300) -> Dict[str, Any]:
"""Wait for message delivery with timeout"""
start_time = time.time()
while time.time() - start_time < timeout:
status = self.get_message_status(message_id)
if status['status'] in ['delivered', 'failed']:
return status
time.sleep(5)
raise TimeoutError(f'Message {message_id} did not reach final status within {timeout} seconds')
# Usage example
client = AS2Client(os.environ['AS2AAS_API_KEY'])
def process_pharmaceutical_shipment():
"""Example: EPCIS pharmaceutical shipment processing"""
# Create pharmaceutical partner
partner = client.create_partner({
'as2_id': 'PHARMA_DISTRIBUTOR',
'name': 'Pharmaceutical Distributor LLC',
'url': 'https://pharma.distributor.com/as2',
'email': '[email protected]',
'compression': True,
'mdn_mode': 'async',
'encryption_cert_id': 'cert_pharma_enc'
})
# Send EPCIS shipment document
epcis_content = """<?xml version="1.0" encoding="UTF-8"?>
<epcis:EPCISDocument xmlns:epcis="urn:epcglobal:epcis:xsd:1">
<EPCISBody>
<EventList>
<ObjectEvent>
<eventTime>2024-01-15T10:30:00Z</eventTime>
<epcList>
<epc>urn:epc:id:sgtin:0614141.812345.6789</epc>
</epcList>
<action>OBSERVE</action>
<bizStep>urn:epcglobal:cbv:bizstep:shipping</bizStep>
</ObjectEvent>
</EventList>
</EPCISBody>
</epcis:EPCISDocument>"""
message = client.send_message({
'partner_id': partner['id'],
'subject': 'DSCSA Shipment Verification',
'payload': {
'content': epcis_content,
'filename': 'shipment_verification.xml'
},
'content_type': 'application/xml',
'headers': {
'DSCSA-Version': '3.0',
'Event-Type': 'Shipment'
}
})
# Wait for delivery confirmation
final_status = client.wait_for_delivery(message['id'])
print(f"Shipment notification status: {final_status['status']}")
PHP Integration
<?php
class AS2Client
{
private $apiKey;
private $baseUrl;
private $httpClient;
public function __construct(string $apiKey, string $baseUrl = 'https://api.as2aas.com/v1')
{
$this->apiKey = $apiKey;
$this->baseUrl = $baseUrl;
$this->httpClient = new \GuzzleHttp\Client([
'base_uri' => $this->baseUrl,
'headers' => [
'Authorization' => 'Bearer ' . $this->apiKey,
'Content-Type' => 'application/json'
]
]);
}
public function createPartner(array $partnerData): array
{
$response = $this->httpClient->post('/partners', [
'json' => $partnerData,
'headers' => [
'Idempotency-Key' => 'partner_' . time()
]
]);
return json_decode($response->getBody(), true);
}
public function sendMessage(array $messageData): array
{
$response = $this->httpClient->post('/messages', [
'json' => $messageData,
'headers' => [
'Idempotency-Key' => 'msg_' . time()
]
]);
return json_decode($response->getBody(), true);
}
public function getMessageStatus(string $messageId): array
{
$response = $this->httpClient->get("/messages/{$messageId}");
return json_decode($response->getBody(), true);
}
}
// Usage example
$client = new AS2Client($_ENV['AS2AAS_API_KEY']);
function processRetailEDI() {
global $client;
// Create retail partner
$partner = $client->createPartner([
'as2_id' => 'RETAIL_CHAIN',
'name' => 'National Retail Chain',
'url' => 'https://edi.retailchain.com/as2',
'email' => '[email protected]',
'compression' => true,
'mdn_mode' => 'sync'
]);
// Send EDI 850 (Purchase Order)
$message = $client->sendMessage([
'partner_id' => $partner['id'],
'subject' => 'Purchase Order',
'payload' => [
'content' => 'ISA*00*...*850*...*IEA*1*000000001~',
'filename' => 'purchase_order.edi'
],
'content_type' => 'application/edi-x12',
'headers' => [
'Document-Type' => 'PurchaseOrder',
'PO-Number' => 'PO-2024-001'
]
]);
echo "Purchase order sent: {$message['id']}\n";
}
Webhook Implementation
Express.js Webhook Handler
const express = require('express');
const crypto = require('crypto');
const app = express();
app.use(express.raw({ type: 'application/json' }));
function verifyWebhookSignature(payload, signature, secret) {
const hmac = crypto.createHmac('sha256', secret);
hmac.update(payload, 'utf8');
const digest = 'sha256=' + hmac.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(digest)
);
}
app.post('/webhooks/as2', (req, res) => {
const signature = req.headers['as2aas-signature'];
const payload = req.body;
const secret = process.env.WEBHOOK_SECRET;
// Verify webhook signature
if (!verifyWebhookSignature(payload, signature, secret)) {
return res.status(401).json({ error: 'Invalid signature' });
}
const event = JSON.parse(payload);
// Process different event types
switch (event.type) {
case 'message.delivered':
handleMessageDelivered(event.data);
break;
case 'message.failed':
handleMessageFailed(event.data);
break;
case 'certificate.expires_soon':
handleCertificateExpiry(event.data);
break;
default:
console.log(`Unhandled event type: ${event.type}`);
}
res.status(200).json({ received: true });
});
function handleMessageDelivered(messageData) {
console.log(`Message ${messageData.id} delivered successfully`);
// Update internal systems
updateOrderStatus(messageData.id, 'delivered');
// Notify relevant stakeholders
notifyDeliverySuccess(messageData);
}
function handleMessageFailed(messageData) {
console.error(`Message ${messageData.id} failed: ${messageData.error}`);
// Log failure for investigation
logMessageFailure(messageData);
// Alert operations team
alertOperationsTeam(messageData);
}
function handleCertificateExpiry(certData) {
console.warn(`Certificate ${certData.name} expires in ${certData.days_until_expiry} days`);
// Alert security team
alertSecurityTeam(certData);
}
app.listen(3000, () => {
console.log('AS2 webhook server running on port 3000');
});
Enterprise Integration Patterns
High-Availability Setup
import asyncio
import aiohttp
from typing import List, Dict
class EnterpriseAS2Client:
def __init__(self, api_keys: List[str], base_url: str = 'https://api.as2aas.com/v1'):
self.api_keys = api_keys
self.base_url = base_url
self.current_key_index = 0
async def send_message_with_failover(self, message_data: Dict) -> Dict:
"""Send message with automatic API key failover"""
for attempt in range(len(self.api_keys)):
api_key = self.api_keys[self.current_key_index]
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f'{self.base_url}/messages',
json=message_data,
headers={
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
'Idempotency-Key': f'msg_{message_data.get("id", int(time.time()))}'
}
) as response:
if response.status == 201:
return await response.json()
elif response.status == 429:
# Rate limited - try next key
self.current_key_index = (self.current_key_index + 1) % len(self.api_keys)
await asyncio.sleep(1)
continue
else:
response.raise_for_status()
except Exception as e:
if attempt == len(self.api_keys) - 1:
raise
self.current_key_index = (self.current_key_index + 1) % len(self.api_keys)
await asyncio.sleep(2 ** attempt)
# Usage
client = EnterpriseAS2Client([
os.environ['AS2AAS_PRIMARY_KEY'],
os.environ['AS2AAS_SECONDARY_KEY']
])
Batch Processing
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.Map;
public class AS2BatchProcessor {
private final String apiKey;
private final String baseUrl;
private final ExecutorService executor;
public AS2BatchProcessor(String apiKey) {
this.apiKey = apiKey;
this.baseUrl = "https://api.as2aas.com/v1";
this.executor = Executors.newFixedThreadPool(10);
}
public CompletableFuture<Map<String, Object>> sendMessageAsync(Map<String, Object> messageData) {
return CompletableFuture.supplyAsync(() -> {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/messages"))
.header("Authorization", "Bearer " + apiKey)
.header("Content-Type", "application/json")
.header("Idempotency-Key", "msg_" + System.currentTimeMillis())
.POST(HttpRequest.BodyPublishers.ofString(
new ObjectMapper().writeValueAsString(messageData)
))
.build();
HttpResponse<String> response = HttpClient.newHttpClient()
.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 201) {
return new ObjectMapper().readValue(response.body(), Map.class);
} else {
throw new RuntimeException("Failed to send message: " + response.body());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}, executor);
}
public void processBatch(List<Map<String, Object>> messages) {
List<CompletableFuture<Map<String, Object>>> futures = messages.stream()
.map(this::sendMessageAsync)
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
System.out.println("Batch processing completed");
futures.forEach(future -> {
try {
Map<String, Object> result = future.get();
System.out.println("Message sent: " + result.get("id"));
} catch (Exception e) {
System.err.println("Message failed: " + e.getMessage());
}
});
});
}
}
Industry-Specific Examples
Healthcare Integration
def send_dscsa_verification(client, partner_id, product_sgtin):
"""Send DSCSA Track & Trace verification request"""
epcis_document = f"""<?xml version="1.0" encoding="UTF-8"?>
<epcis:EPCISDocument xmlns:epcis="urn:epcglobal:epcis:xsd:1">
<EPCISBody>
<EventList>
<ObjectEvent>
<eventTime>{datetime.utcnow().isoformat()}Z</eventTime>
<epcList>
<epc>urn:epc:id:sgtin:{product_sgtin}</epc>
</epcList>
<action>OBSERVE</action>
<bizStep>urn:epcglobal:cbv:bizstep:verification</bizStep>
<disposition>urn:epcglobal:cbv:disp:needs_verification</disposition>
</ObjectEvent>
</EventList>
</EPCISBody>
</epcis:EPCISDocument>"""
return client.send_message({
'partner_id': partner_id,
'subject': 'DSCSA Verification Request',
'payload': {
'content': epcis_document,
'filename': f'verification_{product_sgtin}.xml'
},
'content_type': 'application/xml',
'headers': {
'DSCSA-Version': '3.0',
'Event-Type': 'VerificationRequest',
'Product-SGTIN': product_sgtin
}
})
Manufacturing Integration
async function sendProductionSchedule(client, partnerId, scheduleData) {
// Convert internal schedule format to EDI 830
const ediContent = convertToEDI830(scheduleData);
const message = await client.sendMessage({
partner_id: partnerId,
subject: 'Production Schedule Update',
payload: {
content: ediContent,
filename: `schedule_${scheduleData.week}.edi`
},
content_type: 'application/edi-x12',
headers: {
'Document-Type': '830',
'Schedule-Week': scheduleData.week,
'Plant-Code': scheduleData.plantCode
}
});
return message;
}
function convertToEDI830(scheduleData) {
// Implementation specific to your ERP system
return `ISA*00*...*830*...`;
}
Error Handling Patterns
Comprehensive Error Handling
class AS2ErrorHandler {
static async handleAPIError(error, operation, retryCallback = null) {
const errorData = error.response?.data?.error;
switch (errorData?.code) {
case 'rate_limit_exceeded':
const retryAfter = error.response.headers['retry-after'] || 60;
console.log(`Rate limited. Retrying after ${retryAfter} seconds`);
if (retryCallback) {
setTimeout(retryCallback, retryAfter * 1000);
}
break;
case 'invalid_partner':
console.error(`Partner validation failed for operation: ${operation}`);
// Handle partner configuration issues
break;
case 'certificate_expired':
console.error(`Certificate expired during operation: ${operation}`);
// Alert certificate management team
break;
case 'transmission_timeout':
console.error(`Partner endpoint timeout during operation: ${operation}`);
// Implement retry with exponential backoff
break;
default:
console.error(`Unexpected error during ${operation}:`, errorData);
// Log for investigation
}
}
}
Testing and Development
Test Environment Setup
#!/bin/bash
# Set test environment variables
export AS2AAS_API_KEY="pk_test_your_test_key"
export AS2AAS_BASE_URL="https://api.as2aas.com/v1"
# Create test partner
TEST_PARTNER=$(curl -s -X POST "$AS2AAS_BASE_URL/partners" \
-H "Authorization: Bearer $AS2AAS_API_KEY" \
-H "Content-Type: application/json" \
-H "Idempotency-Key: test_partner_$(date +%s)" \
-d '{
"as2_id": "TEST_PARTNER",
"name": "Test Trading Partner",
"url": "https://demo.as2aas.com/as2/receive",
"email": "[email protected]",
"compression": true,
"mdn_mode": "sync"
}')
PARTNER_ID=$(echo $TEST_PARTNER | jq -r '.id')
echo "Created test partner: $PARTNER_ID"
# Send test message
TEST_MESSAGE=$(curl -s -X POST "$AS2AAS_BASE_URL/messages" \
-H "Authorization: Bearer $AS2AAS_API_KEY" \
-H "Content-Type: application/json" \
-H "Idempotency-Key: test_msg_$(date +%s)" \
-d "{
\"partner_id\": \"$PARTNER_ID\",
\"subject\": \"Test Message\",
\"payload\": {
\"content\": \"Test message content\",
\"filename\": \"test.txt\"
},
\"content_type\": \"text/plain\"
}")
MESSAGE_ID=$(echo $TEST_MESSAGE | jq -r '.id')
echo "Sent test message: $MESSAGE_ID"
# Monitor message status
sleep 5
MESSAGE_STATUS=$(curl -s -X GET "$AS2AAS_BASE_URL/messages/$MESSAGE_ID" \
-H "Authorization: Bearer $AS2AAS_API_KEY")
echo "Message status: $(echo $MESSAGE_STATUS | jq -r '.status')"
Production Deployment
Environment Configuration
# docker-compose.yml
version: '3.8'
services:
as2-integration:
image: your-app:latest
environment:
- AS2AAS_API_KEY=${AS2AAS_PRODUCTION_KEY}
- AS2AAS_WEBHOOK_SECRET=${WEBHOOK_SECRET}
- LOG_LEVEL=info
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
Monitoring Integration
import logging
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
# Metrics for monitoring
message_counter = Counter('as2_messages_total', 'Total AS2 messages', ['status', 'partner'])
message_duration = Histogram('as2_message_duration_seconds', 'Message processing duration')
active_partners = Gauge('as2_active_partners', 'Number of active trading partners')
def send_monitored_message(client, message_data):
"""Send message with comprehensive monitoring"""
start_time = time.time()
partner_id = message_data.get('partner_id', 'unknown')
try:
message = client.send_message(message_data)
# Wait for final status
final_status = client.wait_for_delivery(message['id'])
# Record metrics
message_counter.labels(status=final_status['status'], partner=partner_id).inc()
message_duration.observe(time.time() - start_time)
logging.info(f"Message {message['id']} processed successfully")
return message
except Exception as e:
message_counter.labels(status='error', partner=partner_id).inc()
logging.error(f"Message processing failed: {str(e)}")
raise