Building an Event Bus with JS Pub/Sub Pattern

Learn to build a production-grade event bus using the JavaScript Pub/Sub pattern. Covers typed events, middleware pipelines, event replay, namespaced channels, error boundaries, and cross-module communication for scalable applications.

JavaScriptadvanced
16 min read

An event bus is a centralized pub/sub system that enables cross-module communication without direct dependencies. This guide builds a production-grade event bus with middleware support, typed events, replay capabilities, and error handling.

For the pub/sub pattern foundations, see The JavaScript Pub/Sub Pattern: Complete Guide.

Production Event Bus

The core event bus class below handles subscription with priority ordering, one-time listeners, a middleware pipeline that runs before every event delivery, and a history buffer for replay. Each call to on returns an unsubscribe function, so cleanup is straightforward. Events pass through middleware first, where they can be modified, enriched, or filtered out entirely before any handler sees them.

javascriptjavascript
class EventBus {
  #handlers = new Map();
  #middleware = [];
  #history = [];
  #maxHistory = 500;
  #errorHandlers = [];
 
  on(event, handler, options = {}) {
    const { priority = 0, once = false } = options;
 
    if (!this.#handlers.has(event)) {
      this.#handlers.set(event, []);
    }
 
    const entry = { handler, priority, once, id: crypto.randomUUID() };
    this.#handlers.get(event).push(entry);
    this.#handlers.get(event).sort((a, b) => b.priority - a.priority);
 
    return () => this.off(event, entry.id);
  }
 
  once(event, handler, options = {}) {
    return this.on(event, handler, { ...options, once: true });
  }
 
  off(event, handlerOrId) {
    const handlers = this.#handlers.get(event);
    if (!handlers) return;
 
    const idx = handlers.findIndex((h) =>
      typeof handlerOrId === "string"
        ? h.id === handlerOrId
        : h.handler === handlerOrId
    );
 
    if (idx > -1) handlers.splice(idx, 1);
    if (handlers.length === 0) this.#handlers.delete(event);
  }
 
  async emit(event, data) {
    const message = {
      id: crypto.randomUUID(),
      event,
      data,
      timestamp: Date.now(),
      metadata: {},
    };
 
    // Run middleware pipeline
    let processedMessage = message;
    for (const mw of this.#middleware) {
      try {
        processedMessage = await mw(processedMessage);
        if (!processedMessage) return { delivered: 0, filtered: true };
      } catch (error) {
        this.#handleError(error, message);
        return { delivered: 0, error: error.message };
      }
    }
 
    // Store in history
    this.#history.push(processedMessage);
    if (this.#history.length > this.#maxHistory) {
      this.#history.shift();
    }
 
    // Deliver to handlers
    const handlers = this.#handlers.get(event) || [];
    const onceIds = [];
    let delivered = 0;
 
    for (const entry of handlers) {
      try {
        await entry.handler(processedMessage.data, processedMessage);
        delivered++;
 
        if (entry.once) onceIds.push(entry.id);
      } catch (error) {
        this.#handleError(error, processedMessage);
      }
    }
 
    // Remove once handlers
    for (const id of onceIds) {
      this.off(event, id);
    }
 
    return { messageId: message.id, delivered };
  }
 
  use(middleware) {
    this.#middleware.push(middleware);
    return this;
  }
 
  onError(handler) {
    this.#errorHandlers.push(handler);
    return this;
  }
 
  #handleError(error, message) {
    if (this.#errorHandlers.length > 0) {
      this.#errorHandlers.forEach((h) => h(error, message));
    } else {
      console.error(`EventBus error [${message.event}]:`, error);
    }
  }
 
  getHistory(event, limit = 10) {
    const filtered = event
      ? this.#history.filter((m) => m.event === event)
      : this.#history;
    return filtered.slice(-limit);
  }
 
  replay(event, handler) {
    const messages = this.#history.filter((m) => m.event === event);
    messages.forEach((msg) => handler(msg.data, msg));
    return messages.length;
  }
 
  listenerCount(event) {
    return this.#handlers.get(event)?.length || 0;
  }
 
  eventNames() {
    return [...this.#handlers.keys()];
  }
 
  clear() {
    this.#handlers.clear();
    this.#middleware.length = 0;
    this.#history.length = 0;
  }
}
 
// Usage
const bus = new EventBus();
 
bus.onError((error, msg) => {
  console.error(`[EventBus] ${msg.event}: ${error.message}`);
});
 
bus.on("user:login", (data) => {
  console.log(`User logged in: ${data.username}`);
});
 
await bus.emit("user:login", { username: "alice", timestamp: Date.now() });

Middleware Pipeline

Middleware functions intercept every event before it reaches subscribers. Each one receives the message, can modify it or add metadata, and returns it to pass it along. Returning null drops the message entirely, which is how rate limiting works. You compose them by calling bus.use() in the order you want them to run: logging first, then enrichment, then validation, then rate limiting.

javascriptjavascript
// Logging middleware
function loggingMiddleware(message) {
  console.log(`[EVENT] ${message.event}:`, message.data);
  return message;
}
 
// Validation middleware
function validationMiddleware(schemas) {
  return function (message) {
    const schema = schemas[message.event];
    if (!schema) return message;
 
    for (const [field, rules] of Object.entries(schema)) {
      if (rules.required && !(field in message.data)) {
        throw new Error(`Missing required field: ${field}`);
      }
      if (rules.type && typeof message.data[field] !== rules.type) {
        throw new Error(`${field} must be ${rules.type}`);
      }
    }
 
    return message;
  };
}
 
// Enrichment middleware
function enrichmentMiddleware(message) {
  message.metadata.processedAt = Date.now();
  message.metadata.environment = "production";
 
  // Add correlation ID if not present
  if (!message.metadata.correlationId) {
    message.metadata.correlationId = crypto.randomUUID();
  }
 
  return message;
}
 
// Rate limiting middleware
function rateLimitMiddleware(limits) {
  const counters = new Map();
 
  return function (message) {
    const limit = limits[message.event];
    if (!limit) return message;
 
    const key = message.event;
    const now = Date.now();
 
    if (!counters.has(key)) {
      counters.set(key, { count: 0, windowStart: now });
    }
 
    const counter = counters.get(key);
 
    if (now - counter.windowStart > limit.windowMs) {
      counter.count = 0;
      counter.windowStart = now;
    }
 
    counter.count++;
    if (counter.count > limit.max) {
      console.warn(`Rate limit exceeded for ${message.event}`);
      return null; // Filter message
    }
 
    return message;
  };
}
 
// Wire up middleware
const bus2 = new EventBus();
 
bus2
  .use(loggingMiddleware)
  .use(enrichmentMiddleware)
  .use(
    validationMiddleware({
      "order:create": {
        items: { required: true },
        userId: { required: true, type: "string" },
      },
    })
  )
  .use(
    rateLimitMiddleware({
      "analytics:track": { max: 100, windowMs: 60000 },
    })
  );
 
bus2.on("order:create", (data, msg) => {
  console.log(`Order created by ${data.userId}`, msg.metadata);
});
 
await bus2.emit("order:create", {
  items: [{ sku: "A1", qty: 1 }],
  userId: "user_123",
});

Typed Event System

Without type definitions, anyone can emit any event with any shape of data, and you will not know something is wrong until runtime. The TypedEventBus wraps the base event bus and lets you define schemas for each event up front. When someone emits an event, the bus validates the payload against the schema and throws immediately if a required field is missing or has the wrong type.

javascriptjavascript
class TypedEventBus {
  #bus = new EventBus();
  #eventTypes = new Map();
 
  defineEvent(name, schema) {
    this.#eventTypes.set(name, schema);
    return this;
  }
 
  on(event, handler, options) {
    if (!this.#eventTypes.has(event)) {
      console.warn(`Subscribing to undefined event type: ${event}`);
    }
    return this.#bus.on(event, handler, options);
  }
 
  async emit(event, data) {
    const schema = this.#eventTypes.get(event);
 
    if (!schema) {
      console.warn(`Emitting undefined event type: ${event}`);
    } else {
      this.#validatePayload(event, data, schema);
    }
 
    return this.#bus.emit(event, data);
  }
 
  #validatePayload(event, data, schema) {
    for (const [field, rules] of Object.entries(schema.fields || {})) {
      if (rules.required && !(field in data)) {
        throw new Error(`Event "${event}": missing required field "${field}"`);
      }
 
      if (field in data && rules.type) {
        const actualType = Array.isArray(data[field]) ? "array" : typeof data[field];
        if (actualType !== rules.type) {
          throw new TypeError(
            `Event "${event}": "${field}" must be ${rules.type}, got ${actualType}`
          );
        }
      }
    }
  }
 
  getEventTypes() {
    const types = {};
    for (const [name, schema] of this.#eventTypes) {
      types[name] = schema;
    }
    return types;
  }
}
 
// Usage
const typedBus = new TypedEventBus();
 
typedBus
  .defineEvent("cart:itemAdded", {
    description: "Fired when an item is added to cart",
    fields: {
      productId: { type: "string", required: true },
      quantity: { type: "number", required: true },
      price: { type: "number", required: true },
    },
  })
  .defineEvent("cart:checkout", {
    description: "Fired when user initiates checkout",
    fields: {
      cartId: { type: "string", required: true },
      items: { type: "array", required: true },
      total: { type: "number", required: true },
    },
  });
 
typedBus.on("cart:itemAdded", (data) => {
  console.log(`Added product ${data.productId} x${data.quantity}`);
});
 
await typedBus.emit("cart:itemAdded", {
  productId: "PROD-001",
  quantity: 2,
  price: 29.99,
});

Namespaced Event Modules

When multiple modules share one event bus, naming collisions become a real problem. The createEventModule function wraps the bus and automatically prefixes every event name with a namespace. Each module gets its own scoped view of the bus, so the auth module emits auth:login and the cart module emits cart:load, all through the same underlying bus without stepping on each other.

javascriptjavascript
function createEventModule(bus, namespace) {
  return {
    on(event, handler, options) {
      return bus.on(`${namespace}:${event}`, handler, options);
    },
 
    once(event, handler) {
      return bus.once(`${namespace}:${event}`, handler);
    },
 
    emit(event, data) {
      return bus.emit(`${namespace}:${event}`, data);
    },
 
    getHistory(event, limit) {
      return bus.getHistory(`${namespace}:${event}`, limit);
    },
 
    listenerCount(event) {
      return bus.listenerCount(`${namespace}:${event}`);
    },
  };
}
 
// Usage: each module gets its own namespaced view
const sharedBus = new EventBus();
 
const authEvents = createEventModule(sharedBus, "auth");
const cartEvents = createEventModule(sharedBus, "cart");
const uiEvents = createEventModule(sharedBus, "ui");
 
// Module A: Auth
authEvents.on("login", (data) => {
  console.log(`User logged in: ${data.userId}`);
  cartEvents.emit("load", { userId: data.userId });
});
 
// Module B: Cart
cartEvents.on("load", (data) => {
  console.log(`Loading cart for user: ${data.userId}`);
});
 
// Module C: UI reacts to auth
authEvents.on("login", (data) => {
  uiEvents.emit("showWelcome", { name: data.name });
});
 
uiEvents.on("showWelcome", (data) => {
  console.log(`Welcome back, ${data.name}!`);
});
 
// Fire the chain
await authEvents.emit("login", { userId: "U1", name: "Alice" });

Event Bus with Request/Reply

Standard pub/sub is fire-and-forget, but sometimes you need a response. The request/reply pattern works by generating a unique reply channel for each request, setting a timeout, and waiting for a single response on that channel. The responder calls reply() to register a handler that automatically sends results back through the reply channel. If no reply comes within the timeout window, the promise rejects so you do not hang forever.

javascriptjavascript
class RequestReplyBus extends EventBus {
  #pendingRequests = new Map();
  #timeout = 5000;
 
  async request(event, data, timeoutMs) {
    const requestId = crypto.randomUUID();
    const replyEvent = `__reply__:${requestId}`;
    const timeout = timeoutMs || this.#timeout;
 
    return new Promise((resolve, reject) => {
      const timer = setTimeout(() => {
        this.off(replyEvent);
        this.#pendingRequests.delete(requestId);
        reject(new Error(`Request timeout: ${event} (${timeout}ms)`));
      }, timeout);
 
      this.once(replyEvent, (response) => {
        clearTimeout(timer);
        this.#pendingRequests.delete(requestId);
 
        if (response.error) {
          reject(new Error(response.error));
        } else {
          resolve(response.data);
        }
      });
 
      this.#pendingRequests.set(requestId, { event, timer });
      this.emit(event, { ...data, __requestId: requestId });
    });
  }
 
  reply(event, handler) {
    return this.on(event, async (data, message) => {
      const requestId = data.__requestId;
      if (!requestId) return;
 
      try {
        const result = await handler(data, message);
        await this.emit(`__reply__:${requestId}`, { data: result });
      } catch (error) {
        await this.emit(`__reply__:${requestId}`, { error: error.message });
      }
    });
  }
}
 
// Usage
const rrBus = new RequestReplyBus();
 
// Service registers a reply handler
rrBus.reply("users:getById", async (data) => {
  // Simulate DB lookup
  return { id: data.userId, name: "Alice", email: "alice@test.com" };
});
 
// Request and await response
const user = await rrBus.request("users:getById", { userId: "U1" });
console.log("Got user:", user);
Event Bus FeaturePurposeComplexity
Basic pub/subDecouple componentsLow
Middleware pipelineCross-cutting concernsMedium
Typed eventsSchema validationMedium
Namespaced modulesModule isolationLow
Request/ReplyAsync queries via eventsHigh
Event replayLate subscriber supportMedium
Rune AI

Rune AI

Key Insights

  • Event buses centralize cross-module communication through a single pub/sub instance: All modules publish and subscribe through one bus, eliminating direct dependencies between them
  • Middleware pipelines process every event before delivery: Logging, validation, enrichment, and rate limiting run as composable pipeline stages
  • Typed event definitions enforce payload schemas at emission time: Define expected fields and types per event to catch data errors early
  • Namespaced modules give each feature its own event scope: Modules use prefixed events (auth:login, cart:add) without collision risk
  • Request/Reply enables synchronous-style queries over the event bus: Publishers can await responses from reply handlers, bridging pub/sub with request-response patterns
RunePowered by Rune AI

Frequently Asked Questions

When should I use an event bus vs direct function calls?

Use an event bus when multiple modules need to react to the same event, when the publisher should not know about subscribers, or when you want to add new reactions without modifying existing code. Use direct function calls when there is a clear caller-callee relationship, when type safety matters, or when the interaction is simple request-response.

How do I debug event-driven architectures?

Use logging middleware to track every event. Include correlation IDs in events so you can trace a chain of related events across modules. Build a history/replay feature to inspect past events. In development, log subscriber counts per event to detect missing or duplicate subscriptions.

Can event buses cause memory leaks?

Yes. The most common cause is forgetting to unsubscribe when a component or module is destroyed. Always store the unsubscribe function and call it during cleanup. Monitor subscriber counts and set warnings when they exceed expected limits. Limit event history size to prevent unbounded array growth.

How do I handle event ordering and race conditions?

Use sequential processing (process queue one at a time) when ordering matters. For concurrent handlers, accept that completion order is non-deterministic. If a handler depends on another handler's result, use the request/reply pattern instead of fire-and-forget events.

Conclusion

A production event bus provides middleware pipelines, typed events, namespaced modules, request/reply, and event replay. Middleware handles cross-cutting concerns like logging, validation, and rate limiting. Typed events enforce payload schemas. Namespaced modules isolate components. For the pub/sub pattern foundations, see The JavaScript Pub/Sub Pattern: Complete Guide. For the MVC architecture that event buses support, review JavaScript MVC Architecture: Complete Guide.