Building an Event Bus with JS Pub/Sub Pattern
Learn to build a production-grade event bus using the JavaScript Pub/Sub pattern. Covers typed events, middleware pipelines, event replay, namespaced channels, error boundaries, and cross-module communication for scalable applications.
An event bus is a centralized pub/sub system that enables cross-module communication without direct dependencies. This guide builds a production-grade event bus with middleware support, typed events, replay capabilities, and error handling.
For the pub/sub pattern foundations, see The JavaScript Pub/Sub Pattern: Complete Guide.
Production Event Bus
The core event bus class below handles subscription with priority ordering, one-time listeners, a middleware pipeline that runs before every event delivery, and a history buffer for replay. Each call to on returns an unsubscribe function, so cleanup is straightforward. Events pass through middleware first, where they can be modified, enriched, or filtered out entirely before any handler sees them.
class EventBus {
#handlers = new Map();
#middleware = [];
#history = [];
#maxHistory = 500;
#errorHandlers = [];
on(event, handler, options = {}) {
const { priority = 0, once = false } = options;
if (!this.#handlers.has(event)) {
this.#handlers.set(event, []);
}
const entry = { handler, priority, once, id: crypto.randomUUID() };
this.#handlers.get(event).push(entry);
this.#handlers.get(event).sort((a, b) => b.priority - a.priority);
return () => this.off(event, entry.id);
}
once(event, handler, options = {}) {
return this.on(event, handler, { ...options, once: true });
}
off(event, handlerOrId) {
const handlers = this.#handlers.get(event);
if (!handlers) return;
const idx = handlers.findIndex((h) =>
typeof handlerOrId === "string"
? h.id === handlerOrId
: h.handler === handlerOrId
);
if (idx > -1) handlers.splice(idx, 1);
if (handlers.length === 0) this.#handlers.delete(event);
}
async emit(event, data) {
const message = {
id: crypto.randomUUID(),
event,
data,
timestamp: Date.now(),
metadata: {},
};
// Run middleware pipeline
let processedMessage = message;
for (const mw of this.#middleware) {
try {
processedMessage = await mw(processedMessage);
if (!processedMessage) return { delivered: 0, filtered: true };
} catch (error) {
this.#handleError(error, message);
return { delivered: 0, error: error.message };
}
}
// Store in history
this.#history.push(processedMessage);
if (this.#history.length > this.#maxHistory) {
this.#history.shift();
}
// Deliver to handlers
const handlers = this.#handlers.get(event) || [];
const onceIds = [];
let delivered = 0;
for (const entry of handlers) {
try {
await entry.handler(processedMessage.data, processedMessage);
delivered++;
if (entry.once) onceIds.push(entry.id);
} catch (error) {
this.#handleError(error, processedMessage);
}
}
// Remove once handlers
for (const id of onceIds) {
this.off(event, id);
}
return { messageId: message.id, delivered };
}
use(middleware) {
this.#middleware.push(middleware);
return this;
}
onError(handler) {
this.#errorHandlers.push(handler);
return this;
}
#handleError(error, message) {
if (this.#errorHandlers.length > 0) {
this.#errorHandlers.forEach((h) => h(error, message));
} else {
console.error(`EventBus error [${message.event}]:`, error);
}
}
getHistory(event, limit = 10) {
const filtered = event
? this.#history.filter((m) => m.event === event)
: this.#history;
return filtered.slice(-limit);
}
replay(event, handler) {
const messages = this.#history.filter((m) => m.event === event);
messages.forEach((msg) => handler(msg.data, msg));
return messages.length;
}
listenerCount(event) {
return this.#handlers.get(event)?.length || 0;
}
eventNames() {
return [...this.#handlers.keys()];
}
clear() {
this.#handlers.clear();
this.#middleware.length = 0;
this.#history.length = 0;
}
}
// Usage
const bus = new EventBus();
bus.onError((error, msg) => {
console.error(`[EventBus] ${msg.event}: ${error.message}`);
});
bus.on("user:login", (data) => {
console.log(`User logged in: ${data.username}`);
});
await bus.emit("user:login", { username: "alice", timestamp: Date.now() });Middleware Pipeline
Middleware functions intercept every event before it reaches subscribers. Each one receives the message, can modify it or add metadata, and returns it to pass it along. Returning null drops the message entirely, which is how rate limiting works. You compose them by calling bus.use() in the order you want them to run: logging first, then enrichment, then validation, then rate limiting.
// Logging middleware
function loggingMiddleware(message) {
console.log(`[EVENT] ${message.event}:`, message.data);
return message;
}
// Validation middleware
function validationMiddleware(schemas) {
return function (message) {
const schema = schemas[message.event];
if (!schema) return message;
for (const [field, rules] of Object.entries(schema)) {
if (rules.required && !(field in message.data)) {
throw new Error(`Missing required field: ${field}`);
}
if (rules.type && typeof message.data[field] !== rules.type) {
throw new Error(`${field} must be ${rules.type}`);
}
}
return message;
};
}
// Enrichment middleware
function enrichmentMiddleware(message) {
message.metadata.processedAt = Date.now();
message.metadata.environment = "production";
// Add correlation ID if not present
if (!message.metadata.correlationId) {
message.metadata.correlationId = crypto.randomUUID();
}
return message;
}
// Rate limiting middleware
function rateLimitMiddleware(limits) {
const counters = new Map();
return function (message) {
const limit = limits[message.event];
if (!limit) return message;
const key = message.event;
const now = Date.now();
if (!counters.has(key)) {
counters.set(key, { count: 0, windowStart: now });
}
const counter = counters.get(key);
if (now - counter.windowStart > limit.windowMs) {
counter.count = 0;
counter.windowStart = now;
}
counter.count++;
if (counter.count > limit.max) {
console.warn(`Rate limit exceeded for ${message.event}`);
return null; // Filter message
}
return message;
};
}
// Wire up middleware
const bus2 = new EventBus();
bus2
.use(loggingMiddleware)
.use(enrichmentMiddleware)
.use(
validationMiddleware({
"order:create": {
items: { required: true },
userId: { required: true, type: "string" },
},
})
)
.use(
rateLimitMiddleware({
"analytics:track": { max: 100, windowMs: 60000 },
})
);
bus2.on("order:create", (data, msg) => {
console.log(`Order created by ${data.userId}`, msg.metadata);
});
await bus2.emit("order:create", {
items: [{ sku: "A1", qty: 1 }],
userId: "user_123",
});Typed Event System
Without type definitions, anyone can emit any event with any shape of data, and you will not know something is wrong until runtime. The TypedEventBus wraps the base event bus and lets you define schemas for each event up front. When someone emits an event, the bus validates the payload against the schema and throws immediately if a required field is missing or has the wrong type.
class TypedEventBus {
#bus = new EventBus();
#eventTypes = new Map();
defineEvent(name, schema) {
this.#eventTypes.set(name, schema);
return this;
}
on(event, handler, options) {
if (!this.#eventTypes.has(event)) {
console.warn(`Subscribing to undefined event type: ${event}`);
}
return this.#bus.on(event, handler, options);
}
async emit(event, data) {
const schema = this.#eventTypes.get(event);
if (!schema) {
console.warn(`Emitting undefined event type: ${event}`);
} else {
this.#validatePayload(event, data, schema);
}
return this.#bus.emit(event, data);
}
#validatePayload(event, data, schema) {
for (const [field, rules] of Object.entries(schema.fields || {})) {
if (rules.required && !(field in data)) {
throw new Error(`Event "${event}": missing required field "${field}"`);
}
if (field in data && rules.type) {
const actualType = Array.isArray(data[field]) ? "array" : typeof data[field];
if (actualType !== rules.type) {
throw new TypeError(
`Event "${event}": "${field}" must be ${rules.type}, got ${actualType}`
);
}
}
}
}
getEventTypes() {
const types = {};
for (const [name, schema] of this.#eventTypes) {
types[name] = schema;
}
return types;
}
}
// Usage
const typedBus = new TypedEventBus();
typedBus
.defineEvent("cart:itemAdded", {
description: "Fired when an item is added to cart",
fields: {
productId: { type: "string", required: true },
quantity: { type: "number", required: true },
price: { type: "number", required: true },
},
})
.defineEvent("cart:checkout", {
description: "Fired when user initiates checkout",
fields: {
cartId: { type: "string", required: true },
items: { type: "array", required: true },
total: { type: "number", required: true },
},
});
typedBus.on("cart:itemAdded", (data) => {
console.log(`Added product ${data.productId} x${data.quantity}`);
});
await typedBus.emit("cart:itemAdded", {
productId: "PROD-001",
quantity: 2,
price: 29.99,
});Namespaced Event Modules
When multiple modules share one event bus, naming collisions become a real problem. The createEventModule function wraps the bus and automatically prefixes every event name with a namespace. Each module gets its own scoped view of the bus, so the auth module emits auth:login and the cart module emits cart:load, all through the same underlying bus without stepping on each other.
function createEventModule(bus, namespace) {
return {
on(event, handler, options) {
return bus.on(`${namespace}:${event}`, handler, options);
},
once(event, handler) {
return bus.once(`${namespace}:${event}`, handler);
},
emit(event, data) {
return bus.emit(`${namespace}:${event}`, data);
},
getHistory(event, limit) {
return bus.getHistory(`${namespace}:${event}`, limit);
},
listenerCount(event) {
return bus.listenerCount(`${namespace}:${event}`);
},
};
}
// Usage: each module gets its own namespaced view
const sharedBus = new EventBus();
const authEvents = createEventModule(sharedBus, "auth");
const cartEvents = createEventModule(sharedBus, "cart");
const uiEvents = createEventModule(sharedBus, "ui");
// Module A: Auth
authEvents.on("login", (data) => {
console.log(`User logged in: ${data.userId}`);
cartEvents.emit("load", { userId: data.userId });
});
// Module B: Cart
cartEvents.on("load", (data) => {
console.log(`Loading cart for user: ${data.userId}`);
});
// Module C: UI reacts to auth
authEvents.on("login", (data) => {
uiEvents.emit("showWelcome", { name: data.name });
});
uiEvents.on("showWelcome", (data) => {
console.log(`Welcome back, ${data.name}!`);
});
// Fire the chain
await authEvents.emit("login", { userId: "U1", name: "Alice" });Event Bus with Request/Reply
Standard pub/sub is fire-and-forget, but sometimes you need a response. The request/reply pattern works by generating a unique reply channel for each request, setting a timeout, and waiting for a single response on that channel. The responder calls reply() to register a handler that automatically sends results back through the reply channel. If no reply comes within the timeout window, the promise rejects so you do not hang forever.
class RequestReplyBus extends EventBus {
#pendingRequests = new Map();
#timeout = 5000;
async request(event, data, timeoutMs) {
const requestId = crypto.randomUUID();
const replyEvent = `__reply__:${requestId}`;
const timeout = timeoutMs || this.#timeout;
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.off(replyEvent);
this.#pendingRequests.delete(requestId);
reject(new Error(`Request timeout: ${event} (${timeout}ms)`));
}, timeout);
this.once(replyEvent, (response) => {
clearTimeout(timer);
this.#pendingRequests.delete(requestId);
if (response.error) {
reject(new Error(response.error));
} else {
resolve(response.data);
}
});
this.#pendingRequests.set(requestId, { event, timer });
this.emit(event, { ...data, __requestId: requestId });
});
}
reply(event, handler) {
return this.on(event, async (data, message) => {
const requestId = data.__requestId;
if (!requestId) return;
try {
const result = await handler(data, message);
await this.emit(`__reply__:${requestId}`, { data: result });
} catch (error) {
await this.emit(`__reply__:${requestId}`, { error: error.message });
}
});
}
}
// Usage
const rrBus = new RequestReplyBus();
// Service registers a reply handler
rrBus.reply("users:getById", async (data) => {
// Simulate DB lookup
return { id: data.userId, name: "Alice", email: "alice@test.com" };
});
// Request and await response
const user = await rrBus.request("users:getById", { userId: "U1" });
console.log("Got user:", user);| Event Bus Feature | Purpose | Complexity |
|---|---|---|
| Basic pub/sub | Decouple components | Low |
| Middleware pipeline | Cross-cutting concerns | Medium |
| Typed events | Schema validation | Medium |
| Namespaced modules | Module isolation | Low |
| Request/Reply | Async queries via events | High |
| Event replay | Late subscriber support | Medium |
Rune AI
Key Insights
- Event buses centralize cross-module communication through a single pub/sub instance: All modules publish and subscribe through one bus, eliminating direct dependencies between them
- Middleware pipelines process every event before delivery: Logging, validation, enrichment, and rate limiting run as composable pipeline stages
- Typed event definitions enforce payload schemas at emission time: Define expected fields and types per event to catch data errors early
- Namespaced modules give each feature its own event scope: Modules use prefixed events (auth:login, cart:add) without collision risk
- Request/Reply enables synchronous-style queries over the event bus: Publishers can await responses from reply handlers, bridging pub/sub with request-response patterns
Frequently Asked Questions
When should I use an event bus vs direct function calls?
How do I debug event-driven architectures?
Can event buses cause memory leaks?
How do I handle event ordering and race conditions?
Conclusion
A production event bus provides middleware pipelines, typed events, namespaced modules, request/reply, and event replay. Middleware handles cross-cutting concerns like logging, validation, and rate limiting. Typed events enforce payload schemas. Namespaced modules isolate components. For the pub/sub pattern foundations, see The JavaScript Pub/Sub Pattern: Complete Guide. For the MVC architecture that event buses support, review JavaScript MVC Architecture: Complete Guide.
More in this topic
OffscreenCanvas API in JS for UI Performance
Master the OffscreenCanvas API to offload rendering from the main thread. Covers worker-based 2D and WebGL rendering, animation loops inside workers, bitmap transfer, double buffering, chart rendering pipelines, image processing, and performance measurement strategies.
Advanced Web Workers for High Performance JS
Master Web Workers for truly parallel JavaScript execution. Covers dedicated and shared workers, structured cloning, transferable objects, SharedArrayBuffer with Atomics, worker pools, task scheduling, Comlink RPC patterns, module workers, and performance profiling strategies.
JavaScript Macros and Abstract Code Generation
Master JavaScript code generation techniques for compile-time and runtime metaprogramming. Covers AST manipulation, Babel plugin authorship, tagged template literals as macros, code generation pipelines, source-to-source transformation, compile-time evaluation, and safe eval alternatives.