Building an Event Bus with JS Pub/Sub Pattern
Learn to build a production-grade event bus using the JavaScript Pub/Sub pattern. Covers typed events, middleware pipelines, event replay, namespaced channels, error boundaries, and cross-module communication for scalable applications.
An event bus is a centralized pub/sub system that enables cross-module communication without direct dependencies. This guide builds a production-grade event bus with middleware support, typed events, replay capabilities, and error handling.
For the pub/sub pattern foundations, see The JavaScript Pub/Sub Pattern: Complete Guide.
Production Event Bus
class EventBus {
#handlers = new Map();
#middleware = [];
#history = [];
#maxHistory = 500;
#errorHandlers = [];
on(event, handler, options = {}) {
const { priority = 0, once = false } = options;
if (!this.#handlers.has(event)) {
this.#handlers.set(event, []);
}
const entry = { handler, priority, once, id: crypto.randomUUID() };
this.#handlers.get(event).push(entry);
this.#handlers.get(event).sort((a, b) => b.priority - a.priority);
return () => this.off(event, entry.id);
}
once(event, handler, options = {}) {
return this.on(event, handler, { ...options, once: true });
}
off(event, handlerOrId) {
const handlers = this.#handlers.get(event);
if (!handlers) return;
const idx = handlers.findIndex((h) =>
typeof handlerOrId === "string"
? h.id === handlerOrId
: h.handler === handlerOrId
);
if (idx > -1) handlers.splice(idx, 1);
if (handlers.length === 0) this.#handlers.delete(event);
}
async emit(event, data) {
const message = {
id: crypto.randomUUID(),
event,
data,
timestamp: Date.now(),
metadata: {},
};
// Run middleware pipeline
let processedMessage = message;
for (const mw of this.#middleware) {
try {
processedMessage = await mw(processedMessage);
if (!processedMessage) return { delivered: 0, filtered: true };
} catch (error) {
this.#handleError(error, message);
return { delivered: 0, error: error.message };
}
}
// Store in history
this.#history.push(processedMessage);
if (this.#history.length > this.#maxHistory) {
this.#history.shift();
}
// Deliver to handlers
const handlers = this.#handlers.get(event) || [];
const onceIds = [];
let delivered = 0;
for (const entry of handlers) {
try {
await entry.handler(processedMessage.data, processedMessage);
delivered++;
if (entry.once) onceIds.push(entry.id);
} catch (error) {
this.#handleError(error, processedMessage);
}
}
// Remove once handlers
for (const id of onceIds) {
this.off(event, id);
}
return { messageId: message.id, delivered };
}
use(middleware) {
this.#middleware.push(middleware);
return this;
}
onError(handler) {
this.#errorHandlers.push(handler);
return this;
}
#handleError(error, message) {
if (this.#errorHandlers.length > 0) {
this.#errorHandlers.forEach((h) => h(error, message));
} else {
console.error(`EventBus error [${message.event}]:`, error);
}
}
getHistory(event, limit = 10) {
const filtered = event
? this.#history.filter((m) => m.event === event)
: this.#history;
return filtered.slice(-limit);
}
replay(event, handler) {
const messages = this.#history.filter((m) => m.event === event);
messages.forEach((msg) => handler(msg.data, msg));
return messages.length;
}
listenerCount(event) {
return this.#handlers.get(event)?.length || 0;
}
eventNames() {
return [...this.#handlers.keys()];
}
clear() {
this.#handlers.clear();
this.#middleware.length = 0;
this.#history.length = 0;
}
}
// Usage
const bus = new EventBus();
bus.onError((error, msg) => {
console.error(`[EventBus] ${msg.event}: ${error.message}`);
});
bus.on("user:login", (data) => {
console.log(`User logged in: ${data.username}`);
});
await bus.emit("user:login", { username: "alice", timestamp: Date.now() });Middleware Pipeline
// Logging middleware
function loggingMiddleware(message) {
console.log(`[EVENT] ${message.event}:`, message.data);
return message;
}
// Validation middleware
function validationMiddleware(schemas) {
return function (message) {
const schema = schemas[message.event];
if (!schema) return message;
for (const [field, rules] of Object.entries(schema)) {
if (rules.required && !(field in message.data)) {
throw new Error(`Missing required field: ${field}`);
}
if (rules.type && typeof message.data[field] !== rules.type) {
throw new Error(`${field} must be ${rules.type}`);
}
}
return message;
};
}
// Enrichment middleware
function enrichmentMiddleware(message) {
message.metadata.processedAt = Date.now();
message.metadata.environment = "production";
// Add correlation ID if not present
if (!message.metadata.correlationId) {
message.metadata.correlationId = crypto.randomUUID();
}
return message;
}
// Rate limiting middleware
function rateLimitMiddleware(limits) {
const counters = new Map();
return function (message) {
const limit = limits[message.event];
if (!limit) return message;
const key = message.event;
const now = Date.now();
if (!counters.has(key)) {
counters.set(key, { count: 0, windowStart: now });
}
const counter = counters.get(key);
if (now - counter.windowStart > limit.windowMs) {
counter.count = 0;
counter.windowStart = now;
}
counter.count++;
if (counter.count > limit.max) {
console.warn(`Rate limit exceeded for ${message.event}`);
return null; // Filter message
}
return message;
};
}
// Wire up middleware
const bus2 = new EventBus();
bus2
.use(loggingMiddleware)
.use(enrichmentMiddleware)
.use(
validationMiddleware({
"order:create": {
items: { required: true },
userId: { required: true, type: "string" },
},
})
)
.use(
rateLimitMiddleware({
"analytics:track": { max: 100, windowMs: 60000 },
})
);
bus2.on("order:create", (data, msg) => {
console.log(`Order created by ${data.userId}`, msg.metadata);
});
await bus2.emit("order:create", {
items: [{ sku: "A1", qty: 1 }],
userId: "user_123",
});Typed Event System
class TypedEventBus {
#bus = new EventBus();
#eventTypes = new Map();
defineEvent(name, schema) {
this.#eventTypes.set(name, schema);
return this;
}
on(event, handler, options) {
if (!this.#eventTypes.has(event)) {
console.warn(`Subscribing to undefined event type: ${event}`);
}
return this.#bus.on(event, handler, options);
}
async emit(event, data) {
const schema = this.#eventTypes.get(event);
if (!schema) {
console.warn(`Emitting undefined event type: ${event}`);
} else {
this.#validatePayload(event, data, schema);
}
return this.#bus.emit(event, data);
}
#validatePayload(event, data, schema) {
for (const [field, rules] of Object.entries(schema.fields || {})) {
if (rules.required && !(field in data)) {
throw new Error(`Event "${event}": missing required field "${field}"`);
}
if (field in data && rules.type) {
const actualType = Array.isArray(data[field]) ? "array" : typeof data[field];
if (actualType !== rules.type) {
throw new TypeError(
`Event "${event}": "${field}" must be ${rules.type}, got ${actualType}`
);
}
}
}
}
getEventTypes() {
const types = {};
for (const [name, schema] of this.#eventTypes) {
types[name] = schema;
}
return types;
}
}
// Usage
const typedBus = new TypedEventBus();
typedBus
.defineEvent("cart:itemAdded", {
description: "Fired when an item is added to cart",
fields: {
productId: { type: "string", required: true },
quantity: { type: "number", required: true },
price: { type: "number", required: true },
},
})
.defineEvent("cart:checkout", {
description: "Fired when user initiates checkout",
fields: {
cartId: { type: "string", required: true },
items: { type: "array", required: true },
total: { type: "number", required: true },
},
});
typedBus.on("cart:itemAdded", (data) => {
console.log(`Added product ${data.productId} x${data.quantity}`);
});
await typedBus.emit("cart:itemAdded", {
productId: "PROD-001",
quantity: 2,
price: 29.99,
});Namespaced Event Modules
function createEventModule(bus, namespace) {
return {
on(event, handler, options) {
return bus.on(`${namespace}:${event}`, handler, options);
},
once(event, handler) {
return bus.once(`${namespace}:${event}`, handler);
},
emit(event, data) {
return bus.emit(`${namespace}:${event}`, data);
},
getHistory(event, limit) {
return bus.getHistory(`${namespace}:${event}`, limit);
},
listenerCount(event) {
return bus.listenerCount(`${namespace}:${event}`);
},
};
}
// Usage: each module gets its own namespaced view
const sharedBus = new EventBus();
const authEvents = createEventModule(sharedBus, "auth");
const cartEvents = createEventModule(sharedBus, "cart");
const uiEvents = createEventModule(sharedBus, "ui");
// Module A: Auth
authEvents.on("login", (data) => {
console.log(`User logged in: ${data.userId}`);
cartEvents.emit("load", { userId: data.userId });
});
// Module B: Cart
cartEvents.on("load", (data) => {
console.log(`Loading cart for user: ${data.userId}`);
});
// Module C: UI reacts to auth
authEvents.on("login", (data) => {
uiEvents.emit("showWelcome", { name: data.name });
});
uiEvents.on("showWelcome", (data) => {
console.log(`Welcome back, ${data.name}!`);
});
// Fire the chain
await authEvents.emit("login", { userId: "U1", name: "Alice" });Event Bus with Request/Reply
class RequestReplyBus extends EventBus {
#pendingRequests = new Map();
#timeout = 5000;
async request(event, data, timeoutMs) {
const requestId = crypto.randomUUID();
const replyEvent = `__reply__:${requestId}`;
const timeout = timeoutMs || this.#timeout;
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.off(replyEvent);
this.#pendingRequests.delete(requestId);
reject(new Error(`Request timeout: ${event} (${timeout}ms)`));
}, timeout);
this.once(replyEvent, (response) => {
clearTimeout(timer);
this.#pendingRequests.delete(requestId);
if (response.error) {
reject(new Error(response.error));
} else {
resolve(response.data);
}
});
this.#pendingRequests.set(requestId, { event, timer });
this.emit(event, { ...data, __requestId: requestId });
});
}
reply(event, handler) {
return this.on(event, async (data, message) => {
const requestId = data.__requestId;
if (!requestId) return;
try {
const result = await handler(data, message);
await this.emit(`__reply__:${requestId}`, { data: result });
} catch (error) {
await this.emit(`__reply__:${requestId}`, { error: error.message });
}
});
}
}
// Usage
const rrBus = new RequestReplyBus();
// Service registers a reply handler
rrBus.reply("users:getById", async (data) => {
// Simulate DB lookup
return { id: data.userId, name: "Alice", email: "alice@test.com" };
});
// Request and await response
const user = await rrBus.request("users:getById", { userId: "U1" });
console.log("Got user:", user);| Event Bus Feature | Purpose | Complexity |
|---|---|---|
| Basic pub/sub | Decouple components | Low |
| Middleware pipeline | Cross-cutting concerns | Medium |
| Typed events | Schema validation | Medium |
| Namespaced modules | Module isolation | Low |
| Request/Reply | Async queries via events | High |
| Event replay | Late subscriber support | Medium |
Rune AI
Key Insights
- Event buses centralize cross-module communication through a single pub/sub instance: All modules publish and subscribe through one bus, eliminating direct dependencies between them
- Middleware pipelines process every event before delivery: Logging, validation, enrichment, and rate limiting run as composable pipeline stages
- Typed event definitions enforce payload schemas at emission time: Define expected fields and types per event to catch data errors early
- Namespaced modules give each feature its own event scope: Modules use prefixed events (auth:login, cart:add) without collision risk
- Request/Reply enables synchronous-style queries over the event bus: Publishers can await responses from reply handlers, bridging pub/sub with request-response patterns
Frequently Asked Questions
When should I use an event bus vs direct function calls?
How do I debug event-driven architectures?
Can event buses cause memory leaks?
How do I handle event ordering and race conditions?
Conclusion
A production event bus provides middleware pipelines, typed events, namespaced modules, request/reply, and event replay. Middleware handles cross-cutting concerns like logging, validation, and rate limiting. Typed events enforce payload schemas. Namespaced modules isolate components. For the pub/sub pattern foundations, see The JavaScript Pub/Sub Pattern: Complete Guide. For the MVC architecture that event buses support, review JavaScript MVC Architecture: Complete Guide.
More in this topic
OffscreenCanvas API in JS for UI Performance
Master the OffscreenCanvas API to offload rendering from the main thread. Covers worker-based 2D and WebGL rendering, animation loops inside workers, bitmap transfer, double buffering, chart rendering pipelines, image processing, and performance measurement strategies.
Advanced Web Workers for High Performance JS
Master Web Workers for truly parallel JavaScript execution. Covers dedicated and shared workers, structured cloning, transferable objects, SharedArrayBuffer with Atomics, worker pools, task scheduling, Comlink RPC patterns, module workers, and performance profiling strategies.
JavaScript Macros and Abstract Code Generation
Master JavaScript code generation techniques for compile-time and runtime metaprogramming. Covers AST manipulation, Babel plugin authorship, tagged template literals as macros, code generation pipelines, source-to-source transformation, compile-time evaluation, and safe eval alternatives.