
Adapters
The Message Broker Adapter system allows you to integrate the internal message broker with external messaging systems such as WebSockets, FDC3, or any other communication protocol. Adapters enable bidirectional message flow between your internal application and external systems.
Overview
Adapters implement the IMessageBrokerAdapter<T>
interface and provide:
- Outbound messaging: Messages published to internal channels are automatically sent to registered adapters
- Inbound messaging: Messages received from external systems are forwarded to internal subscribers
- Error handling: Adapter failures are captured and exposed via an error stream
- Connection management: Initialize, connect, and disconnect from external systems
Creating an Adapter
To create an adapter, implement the IMessageBrokerAdapter<T>
interface:
import {
IMessageBrokerAdapter,
IMessage,
} from '@morgan-stanley/message-broker';
import { Observable } from 'rxjs';
export class WebSocketAdapter implements IMessageBrokerAdapter<IMyChannels> {
private socket: WebSocket | null = null;
private messageSubject = new Subject<IMessage>();
async initialize(): Promise<void> {
console.log('WebSocket adapter initialized');
}
async connect(): Promise<void> {
this.socket = new WebSocket('ws://localhost:8080');
this.socket.onmessage = (event) => {
const message: IMessage = JSON.parse(event.data);
this.messageSubject.next(message);
};
return new Promise((resolve, reject) => {
this.socket.onopen = () => resolve();
this.socket.onerror = (error) => reject(error);
});
}
async disconnect(): Promise<void> {
if (this.socket) {
this.socket.close();
this.socket = null;
}
}
async sendMessage(
channelName: keyof IMyChannels,
message: IMessage
): Promise<void> {
if (isConnected()) {
this.socket.send(JSON.stringify({ channelName, message }));
} else {
throw new Error('WebSocket not connected');
}
}
getMessageStream(): Observable<IMessage> {
return this.messageSubject.asObservable();
}
isConnected(): boolean {
return this.socket?.readyState === WebSocket.OPEN;
}
}
Registering Adapters
Register adapters with your message broker instance:
const broker = messagebroker<IMyChannels>();
const wsAdapter = new WebSocketAdapter();
// Initialize and connect the adapter
await wsAdapter.initialize();
await wsAdapter.connect();
// Register with the message broker (returns the generated adapter ID)
const adapterId = broker.registerAdapter(wsAdapter);
console.log('Adapter registered with ID:', adapterId);
Automatic Message Distribution
Once registered, adapters automatically receive published messages:
// This message will be sent to all registered adapters
broker.create('user-events').publish({
userId: '123',
action: 'login',
});
Receiving External Messages
Messages from external systems are automatically forwarded to internal subscribers:
// Subscribe to messages (from both internal and external sources)
broker.get('user-events').subscribe((message) => {
console.log('Received message:', message.data);
// This will receive messages from both internal publishes
// and external adapter messages
});
Error Handling
Monitor adapter failures using the error stream:
broker.getErrorStream().subscribe((error) => {
console.error(`Adapter ${error.adapterId} failed:`, {
channel: error.channelName,
message: error.message,
error: error.error,
timestamp: new Date(error.timestamp),
});
// Handle the error (e.g., retry, notify user, etc.)
});
Managing Adapters
List All Adapters
const adapters = broker.getAdapters();
console.log('Number of registered adapters:', Object.keys(adapters).length);
Unregister an Adapter
// Use the ID returned from registerAdapter
broker.unregisterAdapter(adapterId);
Check Connection Status
const adapters = broker.getAdapters();
const connectedCount = Object.values(adapters).filter((adapter) =>
adapter.isConnected()
).length;
console.log(`${connectedCount} adapters are connected`);
Working with Adapter IDs and Values
The getAdapters()
function returns a record of adapter IDs mapped to adapter instances, giving you access to both the unique identifier and the adapter:
const adapters = broker.getAdapters();
// Access a specific adapter by ID
const specificAdapter = adapters[adapterId];
if (specificAdapter?.isConnected()) {
console.log(`Adapter ${adapterId} is ready`);
}
// Iterate over all adapters with their IDs
Object.entries(adapters).forEach(([id, adapter]) => {
console.log(`Adapter ${id}: ${adapter.isConnected() ? 'Connected' : 'Disconnected'}`);
});
// Get just the adapter instances
const adapterList = Object.values(adapters);
// Get just the adapter IDs
const adapterIds = Object.keys(adapters);
Best Practices
1. Error Resilience
Always implement proper error handling in your adapters:
async sendMessage(channelName: keyof T, message: IMessage): Promise<void> {
if (!this.isConnected()) {
throw new Error('Not connected to external system');
}
// The message broker will catch any errors and send them to the error stream
await this.externalSystem.send(channelName, message);
}
2. Connection Management
Implement proper connection lifecycle management:
class RobustAdapter implements IMessageBrokerAdapter<T> {
private reconnectInterval: number = 5000;
private maxRetries: number = 5;
private retryCount: number = 0;
async connect(): Promise<void> {
try {
await this.establishConnection();
this.retryCount = 0; // Reset on successful connection
} catch (error) {
if (this.retryCount < this.maxRetries) {
this.retryCount++;
setTimeout(() => this.connect(), this.reconnectInterval);
} else {
throw error;
}
}
}
}
3. Message Stream
Return a single stream of all messages from your external system:
getMessageStream(): Observable<IMessage> {
return this.messageStream.pipe(
// Optional: Add any adapter-specific filtering or transformations
filter(message => this.shouldProcessMessage(message))
);
}
The message broker handles channel filtering automatically, so your adapter only needs to provide the raw message stream.