Message Broker

Adapters

The Message Broker Adapter system allows you to integrate the internal message broker with external messaging systems such as WebSockets, FDC3, or any other communication protocol. Adapters enable bidirectional message flow between your internal application and external systems.

Overview

Adapters implement the IMessageBrokerAdapter<T> interface and provide:

  • Outbound messaging: Messages published to internal channels are automatically sent to registered adapters
  • Inbound messaging: Messages received from external systems are forwarded to internal subscribers
  • Error handling: Adapter failures are captured and exposed via an error stream
  • Connection management: Initialize, connect, and disconnect from external systems

Creating an Adapter

To create an adapter, implement the IMessageBrokerAdapter<T> interface:

import {
  IMessageBrokerAdapter,
  IMessage,
} from '@morgan-stanley/message-broker';
import { Observable } from 'rxjs';

export class WebSocketAdapter implements IMessageBrokerAdapter<IMyChannels> {
  private socket: WebSocket | null = null;
  private messageSubject = new Subject<IMessage>();

  async initialize(): Promise<void> {
    console.log('WebSocket adapter initialized');
  }

  async connect(): Promise<void> {
    this.socket = new WebSocket('ws://localhost:8080');

    this.socket.onmessage = (event) => {
      const message: IMessage = JSON.parse(event.data);
      this.messageSubject.next(message);
    };

    return new Promise((resolve, reject) => {
      this.socket.onopen = () => resolve();
      this.socket.onerror = (error) => reject(error);
    });
  }

  async disconnect(): Promise<void> {
    if (this.socket) {
      this.socket.close();
      this.socket = null;
    }
  }

  async sendMessage(
    channelName: keyof IMyChannels,
    message: IMessage
  ): Promise<void> {
    if (isConnected()) {
      this.socket.send(JSON.stringify({ channelName, message }));
    } else {
      throw new Error('WebSocket not connected');
    }
  }

  getMessageStream(): Observable<IMessage> {
    return this.messageSubject.asObservable();
  }

  isConnected(): boolean {
    return this.socket?.readyState === WebSocket.OPEN;
  }
}

Registering Adapters

Register adapters with your message broker instance:

const broker = messagebroker<IMyChannels>();
const wsAdapter = new WebSocketAdapter();

// Initialize and connect the adapter
await wsAdapter.initialize();
await wsAdapter.connect();

// Register with the message broker (returns the generated adapter ID)
const adapterId = broker.registerAdapter(wsAdapter);
console.log('Adapter registered with ID:', adapterId);

Automatic Message Distribution

Once registered, adapters automatically receive published messages:

// This message will be sent to all registered adapters
broker.create('user-events').publish({
  userId: '123',
  action: 'login',
});

Receiving External Messages

Messages from external systems are automatically forwarded to internal subscribers:

// Subscribe to messages (from both internal and external sources)
broker.get('user-events').subscribe((message) => {
  console.log('Received message:', message.data);
  // This will receive messages from both internal publishes
  // and external adapter messages
});

Error Handling

Monitor adapter failures using the error stream:

broker.getErrorStream().subscribe((error) => {
  console.error(`Adapter ${error.adapterId} failed:`, {
    channel: error.channelName,
    message: error.message,
    error: error.error,
    timestamp: new Date(error.timestamp),
  });

  // Handle the error (e.g., retry, notify user, etc.)
});

Managing Adapters

List All Adapters

const adapters = broker.getAdapters();
console.log('Number of registered adapters:', Object.keys(adapters).length);

Unregister an Adapter

// Use the ID returned from registerAdapter
broker.unregisterAdapter(adapterId);

Check Connection Status

const adapters = broker.getAdapters();
const connectedCount = Object.values(adapters).filter((adapter) =>
  adapter.isConnected()
).length;
console.log(`${connectedCount} adapters are connected`);

Working with Adapter IDs and Values

The getAdapters() function returns a record of adapter IDs mapped to adapter instances, giving you access to both the unique identifier and the adapter:

const adapters = broker.getAdapters();

// Access a specific adapter by ID
const specificAdapter = adapters[adapterId];
if (specificAdapter?.isConnected()) {
  console.log(`Adapter ${adapterId} is ready`);
}

// Iterate over all adapters with their IDs
Object.entries(adapters).forEach(([id, adapter]) => {
  console.log(`Adapter ${id}: ${adapter.isConnected() ? 'Connected' : 'Disconnected'}`);
});

// Get just the adapter instances
const adapterList = Object.values(adapters);

// Get just the adapter IDs
const adapterIds = Object.keys(adapters);

Best Practices

1. Error Resilience

Always implement proper error handling in your adapters:

async sendMessage(channelName: keyof T, message: IMessage): Promise<void> {
    if (!this.isConnected()) {
        throw new Error('Not connected to external system');
    }

    // The message broker will catch any errors and send them to the error stream
    await this.externalSystem.send(channelName, message);
}

2. Connection Management

Implement proper connection lifecycle management:

class RobustAdapter implements IMessageBrokerAdapter<T> {
  private reconnectInterval: number = 5000;
  private maxRetries: number = 5;
  private retryCount: number = 0;

  async connect(): Promise<void> {
    try {
      await this.establishConnection();
      this.retryCount = 0; // Reset on successful connection
    } catch (error) {
      if (this.retryCount < this.maxRetries) {
        this.retryCount++;
        setTimeout(() => this.connect(), this.reconnectInterval);
      } else {
        throw error;
      }
    }
  }
}

3. Message Stream

Return a single stream of all messages from your external system:

getMessageStream(): Observable<IMessage> {
    return this.messageStream.pipe(
        // Optional: Add any adapter-specific filtering or transformations
        filter(message => this.shouldProcessMessage(message))
    );
}

The message broker handles channel filtering automatically, so your adapter only needs to provide the raw message stream.