Skip to main content

Event-Driven Architecture (EDA) API Design

Event-Driven Architecture (EDA) has emerged as a cornerstone pattern for building scalable, resilient, and loosely coupled systems in modern software development. As organizations increasingly adopt microservices and distributed architectures, understanding how to design and implement EDA APIs becomes crucial for creating systems that can handle real-time data processing, high throughput, and complex business workflows.

In this comprehensive guide, we'll explore the fundamentals of Event-Driven Architecture, dive deep into API design patterns, and provide practical JavaScript examples that you can implement in your projects today.

Event-Driven Architecture

Understanding Event-Driven Architecture

Event-Driven Architecture is a software design pattern where the flow of the program is determined by events such as user actions, sensor outputs, or messages from other programs. In an EDA system, components communicate through the production and consumption of events rather than direct API calls.

Core Components of EDA

Event Producers: Components that generate and publish events when something significant happens in the system. These could be user interfaces, sensors, or business logic components.

Event Consumers: Components that subscribe to and process events. They react to events by performing specific actions, updating data, or triggering additional events.

Event Channels: The infrastructure that routes events from producers to consumers. This includes message brokers, event buses, or streaming platforms.

Event Store: A persistent storage mechanism that maintains the history of events, enabling replay, auditing, and recovery capabilities.

Benefits of Event-Driven Architecture

EDA offers several compelling advantages over traditional request-response architectures.

Loose Coupling is achieved because components don't need to know about each other directly; they only need to understand the events they produce or consume.

Scalability is enhanced as components can be scaled independently based on their event processing requirements.

Resilience is improved through asynchronous processing, where temporary failures in one component don't immediately cascade to others.

Real-time Processing capabilities enable systems to respond to events as they occur, supporting use cases like real-time analytics and notifications.

EDA API Design Patterns

When designing APIs for event-driven systems, several patterns emerge that help structure communication and data flow effectively.

Publisher-Subscriber Pattern

The Publisher-Subscriber (Pub-Sub) pattern is fundamental to EDA. Publishers emit events without knowing who will consume them, while subscribers register interest in specific event types.

// Event Publisher
class OrderService {
constructor(eventBus) {
this.eventBus = eventBus;
}

async createOrder(orderData) {
try {
// Process order creation logic
const order = await this.processOrder(orderData);

// Publish order created event
await this.eventBus.publish('order.created', {
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount,
timestamp: new Date().toISOString()
});

return order;
} catch (error) {
// Publish order creation failed event
await this.eventBus.publish('order.creation.failed', {
customerId: orderData.customerId,
error: error.message,
timestamp: new Date().toISOString()
});
throw error;
}
}

async processOrder(orderData) {
// Simulate order processing
return {
id: generateOrderId(),
customerId: orderData.customerId,
items: orderData.items,
totalAmount: calculateTotal(orderData.items),
status: 'created'
};
}
}

// Event Subscriber
class InventoryService {
constructor(eventBus) {
this.eventBus = eventBus;
this.setupEventHandlers();
}

setupEventHandlers() {
this.eventBus.subscribe('order.created', this.handleOrderCreated.bind(this));
}

async handleOrderCreated(event) {
console.log('Processing inventory update for order:', event.orderId);

try {
// Update inventory for each item
for (const item of event.items) {
await this.updateInventory(item.productId, item.quantity);
}

// Publish inventory updated event
await this.eventBus.publish('inventory.updated', {
orderId: event.orderId,
items: event.items,
timestamp: new Date().toISOString()
});
} catch (error) {
// Publish inventory update failed event
await this.eventBus.publish('inventory.update.failed', {
orderId: event.orderId,
error: error.message,
timestamp: new Date().toISOString()
});
}
}

async updateInventory(productId, quantity) {
// Simulate inventory update
console.log(`Updating inventory for product ${productId}: -${quantity}`);
}
}

Event Sourcing Pattern

Event Sourcing stores the state of a business entity as a sequence of state-changing events. Instead of storing the current state, you store the events that led to that state.

class EventStore {
constructor() {
this.events = new Map(); // In production, use a persistent store
}

async appendEvent(streamId, event) {
if (!this.events.has(streamId)) {
this.events.set(streamId, []);
}

const eventWithMetadata = {
...event,
eventId: generateEventId(),
timestamp: new Date().toISOString(),
version: this.events.get(streamId).length + 1
};

this.events.get(streamId).push(eventWithMetadata);
return eventWithMetadata;
}

async getEvents(streamId, fromVersion = 0) {
const streamEvents = this.events.get(streamId) || [];
return streamEvents.filter(event => event.version > fromVersion);
}

async getSnapshot(streamId) {
const events = await this.getEvents(streamId);
return this.replayEvents(events);
}

replayEvents(events) {
return events.reduce((state, event) => {
switch (event.type) {
case 'account.created':
return {
...state,
accountId: event.data.accountId,
balance: event.data.initialBalance,
status: 'active'
};
case 'account.debited':
return {
...state,
balance: state.balance - event.data.amount
};
case 'account.credited':
return {
...state,
balance: state.balance + event.data.amount
};
default:
return state;
}
}, {});
}
}

class BankAccount {
constructor(accountId, eventStore) {
this.accountId = accountId;
this.eventStore = eventStore;
this.version = 0;
}

static async create(accountId, initialBalance, eventStore) {
const account = new BankAccount(accountId, eventStore);
await account.applyEvent({
type: 'account.created',
data: { accountId, initialBalance }
});
return account;
}

async debit(amount) {
if (amount <= 0) {
throw new Error('Debit amount must be positive');
}

const currentState = await this.eventStore.getSnapshot(this.accountId);
if (currentState.balance < amount) {
throw new Error('Insufficient funds');
}

await this.applyEvent({
type: 'account.debited',
data: { amount }
});
}

async credit(amount) {
if (amount <= 0) {
throw new Error('Credit amount must be positive');
}

await this.applyEvent({
type: 'account.credited',
data: { amount }
});
}

async applyEvent(event) {
await this.eventStore.appendEvent(this.accountId, event);
this.version++;
}

async getBalance() {
const state = await this.eventStore.getSnapshot(this.accountId);
return state.balance;
}
}

CQRS (Command Query Responsibility Segregation)

CQRS separates read and write operations, often used alongside Event Sourcing to optimize for different access patterns.

// Command Side
class CommandHandler {
constructor(eventStore, eventBus) {
this.eventStore = eventStore;
this.eventBus = eventBus;
}

async handleCreateUser(command) {
const { userId, email, name } = command.data;

// Validate command
if (!email || !name) {
throw new Error('Email and name are required');
}

// Create and store event
const event = await this.eventStore.appendEvent(userId, {
type: 'user.created',
data: { userId, email, name }
});

// Publish event for read model updates
await this.eventBus.publish('user.created', event);

return { success: true, userId };
}

async handleUpdateUserProfile(command) {
const { userId, profileData } = command.data;

const event = await this.eventStore.appendEvent(userId, {
type: 'user.profile.updated',
data: { userId, profileData }
});

await this.eventBus.publish('user.profile.updated', event);

return { success: true, userId };
}
}

// Query Side
class ReadModelUpdater {
constructor(database, eventBus) {
this.database = database;
this.eventBus = eventBus;
this.setupEventHandlers();
}

setupEventHandlers() {
this.eventBus.subscribe('user.created', this.handleUserCreated.bind(this));
this.eventBus.subscribe('user.profile.updated', this.handleUserProfileUpdated.bind(this));
}

async handleUserCreated(event) {
const { userId, email, name } = event.data;

await this.database.users.create({
id: userId,
email,
name,
createdAt: event.timestamp,
lastModified: event.timestamp
});

console.log(`Read model updated: User ${userId} created`);
}

async handleUserProfileUpdated(event) {
const { userId, profileData } = event.data;

await this.database.users.update(userId, {
...profileData,
lastModified: event.timestamp
});

console.log(`Read model updated: User ${userId} profile updated`);
}
}

class QueryHandler {
constructor(database) {
this.database = database;
}

async getUserById(userId) {
return await this.database.users.findById(userId);
}

async getUsersByEmail(email) {
return await this.database.users.findByEmail(email);
}

async getUsersCreatedAfter(date) {
return await this.database.users.findCreatedAfter(date);
}
}

Implementing Event-Driven APIs

RESTful Event APIs

While REST is traditionally request-response, you can design RESTful APIs that work well with event-driven systems.

const express = require('express');
const app = express();

class EventDrivenOrderAPI {
constructor(eventBus, queryHandler) {
this.eventBus = eventBus;
this.queryHandler = queryHandler;
this.setupRoutes();
}

setupRoutes() {
// Command endpoint - triggers events
app.post('/api/orders', async (req, res) => {
try {
const command = {
type: 'create.order',
data: req.body,
correlationId: generateCorrelationId()
};

await this.eventBus.publish('command.create.order', command);

// Return accepted status with correlation ID for tracking
res.status(202).json({
message: 'Order creation initiated',
correlationId: command.correlationId
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});

// Query endpoint - reads from read models
app.get('/api/orders/:orderId', async (req, res) => {
try {
const order = await this.queryHandler.getOrderById(req.params.orderId);
if (!order) {
return res.status(404).json({ error: 'Order not found' });
}
res.json(order);
} catch (error) {
res.status(500).json({ error: error.message });
}
});

// Status endpoint for command tracking
app.get('/api/commands/:correlationId/status', async (req, res) => {
try {
const status = await this.queryHandler.getCommandStatus(
req.params.correlationId
);
res.json(status);
} catch (error) {
res.status(500).json({ error: error.message });
}
});

// Webhook endpoint for event notifications
app.post('/api/webhooks/order-events', async (req, res) => {
try {
const event = req.body;
await this.eventBus.publish(`webhook.${event.type}`, event);
res.status(200).json({ received: true });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
}
}

WebSocket Event Streaming

WebSockets enable real-time event streaming to clients, perfect for live updates and notifications.

const WebSocket = require('ws');

class EventStreamingServer {
constructor(eventBus, port = 8080) {
this.eventBus = eventBus;
this.wss = new WebSocket.Server({ port });
this.clients = new Map();
this.setupWebSocketServer();
this.setupEventHandlers();
}

setupWebSocketServer() {
this.wss.on('connection', (ws, req) => {
const clientId = generateClientId();
console.log(`Client ${clientId} connected`);

this.clients.set(clientId, {
ws,
subscriptions: new Set(),
filters: {}
});

ws.on('message', (message) => {
this.handleClientMessage(clientId, message);
});

ws.on('close', () => {
console.log(`Client ${clientId} disconnected`);
this.clients.delete(clientId);
});

ws.on('error', (error) => {
console.error(`WebSocket error for client ${clientId}:`, error);
this.clients.delete(clientId);
});

// Send welcome message
ws.send(JSON.stringify({
type: 'connection.established',
clientId,
timestamp: new Date().toISOString()
}));
});
}

handleClientMessage(clientId, message) {
try {
const data = JSON.parse(message);
const client = this.clients.get(clientId);

switch (data.type) {
case 'subscribe':
client.subscriptions.add(data.eventType);
client.filters[data.eventType] = data.filters || {};
this.sendToClient(clientId, {
type: 'subscription.confirmed',
eventType: data.eventType
});
break;

case 'unsubscribe':
client.subscriptions.delete(data.eventType);
delete client.filters[data.eventType];
this.sendToClient(clientId, {
type: 'subscription.cancelled',
eventType: data.eventType
});
break;

case 'ping':
this.sendToClient(clientId, { type: 'pong' });
break;
}
} catch (error) {
console.error(`Error handling message from client ${clientId}:`, error);
}
}

setupEventHandlers() {
// Subscribe to all events and route to interested clients
this.eventBus.subscribe('*', (event) => {
this.broadcastEvent(event);
});
}

broadcastEvent(event) {
this.clients.forEach((client, clientId) => {
if (this.shouldSendEventToClient(client, event)) {
this.sendToClient(clientId, {
type: 'event',
event: event
});
}
});
}

shouldSendEventToClient(client, event) {
// Check if client is subscribed to this event type
if (!client.subscriptions.has(event.type) && !client.subscriptions.has('*')) {
return false;
}

// Apply filters if any
const filters = client.filters[event.type] || {};
for (const [key, value] of Object.entries(filters)) {
if (event.data[key] !== value) {
return false;
}
}

return true;
}

sendToClient(clientId, data) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(data));
}
}
}

// Usage example
const eventBus = new EventBus();
const streamingServer = new EventStreamingServer(eventBus, 8080);

// Client-side WebSocket handler
class EventStreamClient {
constructor(url) {
this.url = url;
this.ws = null;
this.eventHandlers = new Map();
this.connect();
}

connect() {
this.ws = new WebSocket(this.url);

this.ws.onopen = () => {
console.log('Connected to event stream');
};

this.ws.onmessage = (message) => {
const data = JSON.parse(message.data);
this.handleMessage(data);
};

this.ws.onclose = () => {
console.log('Disconnected from event stream');
// Implement reconnection logic
setTimeout(() => this.connect(), 5000);
};
}

handleMessage(data) {
switch (data.type) {
case 'event':
this.handleEvent(data.event);
break;
case 'subscription.confirmed':
console.log(`Subscribed to ${data.eventType}`);
break;
case 'subscription.cancelled':
console.log(`Unsubscribed from ${data.eventType}`);
break;
}
}

handleEvent(event) {
const handlers = this.eventHandlers.get(event.type) || [];
handlers.forEach(handler => handler(event));
}

subscribe(eventType, handler, filters = {}) {
if (!this.eventHandlers.has(eventType)) {
this.eventHandlers.set(eventType, []);
}
this.eventHandlers.get(eventType).push(handler);

this.ws.send(JSON.stringify({
type: 'subscribe',
eventType,
filters
}));
}

unsubscribe(eventType) {
this.eventHandlers.delete(eventType);
this.ws.send(JSON.stringify({
type: 'unsubscribe',
eventType
}));
}
}

Advanced EDA Patterns and Best Practices

Event Versioning and Schema Evolution

As systems evolve, event schemas need to change while maintaining backward compatibility.

class EventVersioningHandler {
constructor() {
this.versionHandlers = new Map();
this.setupVersionHandlers();
}

setupVersionHandlers() {
// Version 1 to Version 2 migration
this.versionHandlers.set('user.created.v1', (event) => {
return {
...event,
type: 'user.created.v2',
version: 2,
data: {
...event.data,
// Add new required field with default
accountType: 'standard',
// Restructure nested data
profile: {
name: event.data.name,
email: event.data.email
}
}
};
});

// Version 2 to Version 3 migration
this.versionHandlers.set('user.created.v2', (event) => {
return {
...event,
type: 'user.created.v3',
version: 3,
data: {
...event.data,
// Add metadata
metadata: {
source: 'api',
migrated: true
}
}
};
});
}

migrateEvent(event) {
let currentEvent = event;
const handlerKey = `${event.type}.v${event.version || 1}`;

while (this.versionHandlers.has(handlerKey)) {
currentEvent = this.versionHandlers.get(handlerKey)(currentEvent);
}

return currentEvent;
}

handleEvent(event) {
// Always migrate to latest version before processing
const migratedEvent = this.migrateEvent(event);
return this.processLatestVersion(migratedEvent);
}

processLatestVersion(event) {
switch (event.type) {
case 'user.created.v3':
return this.handleUserCreatedV3(event);
// Handle other event types
default:
console.warn(`Unknown event type: ${event.type}`);
}
}

handleUserCreatedV3(event) {
console.log('Processing user created event v3:', event.data);
// Process the latest version of the event
}
}

Saga Pattern for Distributed Transactions

The Saga pattern manages long-running transactions across multiple services.

class OrderSaga {
constructor(eventBus, commandBus) {
this.eventBus = eventBus;
this.commandBus = commandBus;
this.sagaState = new Map();
this.setupEventHandlers();
}

setupEventHandlers() {
this.eventBus.subscribe('order.created', this.handleOrderCreated.bind(this));
this.eventBus.subscribe('payment.processed', this.handlePaymentProcessed.bind(this));
this.eventBus.subscribe('payment.failed', this.handlePaymentFailed.bind(this));
this.eventBus.subscribe('inventory.reserved', this.handleInventoryReserved.bind(this));
this.eventBus.subscribe('inventory.reservation.failed', this.handleInventoryReservationFailed.bind(this));
this.eventBus.subscribe('shipment.created', this.handleShipmentCreated.bind(this));
}

async handleOrderCreated(event) {
const sagaId = event.orderId;

// Initialize saga state
this.sagaState.set(sagaId, {
orderId: event.orderId,
step: 'order_created',
completedSteps: ['order_created'],
compensations: []
});

// Start the saga by processing payment
await this.commandBus.send({
type: 'process.payment',
data: {
orderId: event.orderId,
amount: event.totalAmount,
customerId: event.customerId
}
});
}

async handlePaymentProcessed(event) {
const saga = this.sagaState.get(event.orderId);
if (!saga) return;

saga.step = 'payment_processed';
saga.completedSteps.push('payment_processed');
saga.compensations.push({
type: 'refund.payment',
data: { orderId: event.orderId, amount: event.amount }
});

// Next step: reserve inventory
await this.commandBus.send({
type: 'reserve.inventory',
data: {
orderId: event.orderId,
items: saga.items
}
});
}

async handlePaymentFailed(event) {
const saga = this.sagaState.get(event.orderId);
if (!saga) return;

// Payment failed, cancel the order
await this.commandBus.send({
type: 'cancel.order',
data: { orderId: event.orderId, reason: 'Payment failed' }
});

this.sagaState.delete(event.orderId);
}

async handleInventoryReserved(event) {
const saga = this.sagaState.get(event.orderId);
if (!saga) return;

saga.step = 'inventory_reserved';
saga.completedSteps.push('inventory_reserved');
saga.compensations.push({
type: 'release.inventory',
data: { orderId: event.orderId, items: event.items }
});

// Final step: create shipment
await this.commandBus.send({
type: 'create.shipment',
data: {
orderId: event.orderId,
items: event.items,
shippingAddress: saga.shippingAddress
}
});
}

async handleInventoryReservationFailed(event) {
const saga = this.sagaState.get(event.orderId);
if (!saga) return;

// Inventory reservation failed, compensate previous steps
await this.compensate(saga);
this.sagaState.delete(event.orderId);
}

async handleShipmentCreated(event) {
const saga = this.sagaState.get(event.orderId);
if (!saga) return;

// Saga completed successfully
await this.eventBus.publish('order.fulfillment.completed', {
orderId: event.orderId,
shipmentId: event.shipmentId,
timestamp: new Date().toISOString()
});

this.sagaState.delete(event.orderId);
}

async compensate(saga) {
console.log(`Compensating saga for order ${saga.orderId}`);

// Execute compensations in reverse order
for (let i = saga.compensations.length - 1; i >= 0; i--) {
const compensation = saga.compensations[i];
await this.commandBus.send(compensation);
}

await this.eventBus.publish('order.saga.compensated', {
orderId: saga.orderId,
compensatedSteps: saga.completedSteps,
timestamp: new Date().toISOString()
});
}
}

Error Handling and Resilience

Dead Letter Queues and Retry Mechanisms

class ResilientEventProcessor {
constructor(eventBus) {
this.eventBus = eventBus;
this.deadLetterQueue = [];
this.retryConfig = {
maxRetries: 3,
baseDelay: 1000,
maxDelay: 30000
};
}

async processEventWithRetry(event, handler) {
let attempts = 0;
let lastError;

while (attempts <= this.retryConfig.maxRetries) {
try {
await handler(event);
return; // Success
} catch (error) {
lastError = error;
attempts++;

if (attempts <= this.retryConfig.maxRetries) {
const delay = this.calculateDelay(attempts);
console.log(`Retry attempt ${attempts} for event ${event.type} in ${delay}ms`);
await this.delay(delay);
} else {
// Max retries exceeded, send to dead letter queue
await this.sendToDeadLetterQueue(event, lastError);
}
}
}
}

calculateDelay(attempt) {
const exponentialDelay = this.retryConfig.baseDelay * Math.pow(2, attempt - 1);
return Math.min(exponentialDelay, this.retryConfig.maxDelay);
}

async sendToDeadLetterQueue(event, error) {
const deadLetterEvent = {
originalEvent: event,
error: {
message: error.message,
stack: error.stack
},
failedAt: new Date().toISOString(),
retryCount: this.retryConfig.maxRetries
};

this.deadLetterQueue.push(deadLetterEvent);

await this.eventBus.publish('event.processing.failed', deadLetterEvent);

console.error(`Event sent to dead letter queue:`, deadLetterEvent);
}

async reprocessDeadLetterQueue() {
console.log(`Reprocessing ${this.deadLetterQueue.length} events from dead letter queue`);

const eventsToReprocess = [...this.deadLetterQueue];
this.deadLetterQueue = [];

for (const deadLetterEvent of eventsToReprocess) {
try {
// Attempt to reprocess the original event
await this.eventBus.publish(
deadLetterEvent.originalEvent.type,
deadLetterEvent.originalEvent
);
console.log(`Successfully reprocessed event: ${deadLetterEvent.originalEvent.type}`);
} catch (error) {
// If it fails again, put it back in the dead letter queue
this.deadLetterQueue.push(deadLetterEvent);
console.error(`Failed to reprocess event: ${deadLetterEvent.originalEvent.type}`);
}
}
}

delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

Conclusion

info

Event-Driven Architecture represents a powerful paradigm for building scalable, resilient, and maintainable systems. By designing APIs that embrace asynchronous communication, event sourcing, and loose coupling, developers can create systems that handle complexity while remaining flexible and responsive to change.

The JavaScript examples provided in this guide demonstrate practical implementations of core EDA patterns, from basic pub-sub systems to complex saga orchestrations. These patterns form the foundation for building modern distributed systems that can scale to meet the demands of today's applications.

When implementing EDA in your projects, remember to start simple and evolve your architecture as requirements grow. Focus on clear event schemas, robust error handling, and comprehensive monitoring to ensure your event-driven systems remain maintainable and observable as they scale.

The future of software architecture increasingly favors event-driven approaches, and mastering these patterns will position you to build the next generation of scalable, real-time applications.