The JavaScript Pub/Sub Pattern: Complete Guide
A complete guide to the JavaScript Pub/Sub pattern. Covers publish/subscribe fundamentals, topic-based messaging, wildcard subscriptions, message filtering, async pub/sub, dead letter queues, and building scalable event-driven architectures.
The Publish/Subscribe pattern decouples message producers (publishers) from consumers (subscribers) through a message broker. Unlike the observer pattern where subjects know their observers directly, pub/sub introduces a mediator that routes messages by topic, enabling truly loosely coupled architectures.
For building event buses with pub/sub, see Building an Event Bus with JS Pub/Sub Pattern.
Core Pub/Sub Implementation
class PubSub {
#topics = new Map();
#subId = 0;
subscribe(topic, handler, options = {}) {
const id = ++this.#subId;
const { priority = 0, filter = null } = options;
if (!this.#topics.has(topic)) {
this.#topics.set(topic, []);
}
const subscription = { id, handler, priority, filter, active: true };
this.#topics.get(topic).push(subscription);
this.#topics.get(topic).sort((a, b) => b.priority - a.priority);
return {
id,
unsubscribe: () => {
const subs = this.#topics.get(topic);
if (subs) {
const idx = subs.findIndex((s) => s.id === id);
if (idx > -1) subs.splice(idx, 1);
}
},
pause: () => { subscription.active = false; },
resume: () => { subscription.active = true; },
};
}
publish(topic, data, meta = {}) {
const message = {
id: crypto.randomUUID(),
topic,
data,
timestamp: Date.now(),
...meta,
};
const subscribers = this.#topics.get(topic) || [];
let delivered = 0;
for (const sub of subscribers) {
if (!sub.active) continue;
if (sub.filter && !sub.filter(message)) continue;
try {
sub.handler(message);
delivered++;
} catch (error) {
console.error(`Subscriber ${sub.id} error on "${topic}":`, error);
}
}
return { messageId: message.id, delivered };
}
hasSubscribers(topic) {
return (this.#topics.get(topic)?.length || 0) > 0;
}
getTopics() {
return [...this.#topics.keys()];
}
clear(topic) {
if (topic) {
this.#topics.delete(topic);
} else {
this.#topics.clear();
}
}
}
// Usage
const bus = new PubSub();
const sub = bus.subscribe("user:created", (msg) => {
console.log("New user:", msg.data.name);
});
bus.publish("user:created", { name: "Alice", email: "alice@test.com" });
sub.pause(); // Temporarily stop receiving
bus.publish("user:created", { name: "Bob" }); // Not delivered
sub.resume(); // Resume receivingWildcard Topic Matching
class WildcardPubSub extends PubSub {
#wildcardSubs = [];
subscribePattern(pattern, handler, options = {}) {
const id = ++this._subId || Date.now();
const regex = this.#patternToRegex(pattern);
const sub = {
id,
pattern,
regex,
handler,
active: true,
...options,
};
this.#wildcardSubs.push(sub);
return {
id,
unsubscribe: () => {
const idx = this.#wildcardSubs.findIndex((s) => s.id === id);
if (idx > -1) this.#wildcardSubs.splice(idx, 1);
},
};
}
#patternToRegex(pattern) {
const escaped = pattern
.replace(/[.+^${}()|[\]\\]/g, "\\$&")
.replace(/\*/g, "[^.]+")
.replace(/#/g, ".*");
return new RegExp(`^${escaped}$`);
}
publish(topic, data, meta = {}) {
const result = super.publish(topic, data, meta);
const message = {
id: crypto.randomUUID(),
topic,
data,
timestamp: Date.now(),
...meta,
};
for (const sub of this.#wildcardSubs) {
if (!sub.active) continue;
if (sub.regex.test(topic)) {
try {
sub.handler(message);
result.delivered++;
} catch (error) {
console.error(`Wildcard subscriber error:`, error);
}
}
}
return result;
}
}
// Usage
const wbus = new WildcardPubSub();
// Match any event in the "orders" namespace
wbus.subscribePattern("orders.*", (msg) => {
console.log(`Orders event [${msg.topic}]:`, msg.data);
});
// Match any deeply nested user event
wbus.subscribePattern("users.#", (msg) => {
console.log(`User event [${msg.topic}]:`, msg.data);
});
wbus.publish("orders.created", { orderId: "ORD-001" });
wbus.publish("orders.shipped", { orderId: "ORD-001" });
wbus.publish("users.profile.updated", { userId: "U1" });
wbus.publish("users.settings.theme.changed", { theme: "dark" });Async Pub/Sub with Queuing
class AsyncPubSub {
#topics = new Map();
#deadLetterQueue = [];
#processing = false;
#messageQueue = [];
subscribe(topic, handler) {
if (!this.#topics.has(topic)) {
this.#topics.set(topic, []);
}
this.#topics.get(topic).push(handler);
return () => {
const handlers = this.#topics.get(topic);
const idx = handlers?.indexOf(handler);
if (idx > -1) handlers.splice(idx, 1);
};
}
async publish(topic, data) {
const message = {
id: crypto.randomUUID(),
topic,
data,
timestamp: Date.now(),
attempts: 0,
maxRetries: 3,
};
this.#messageQueue.push(message);
if (!this.#processing) {
await this.#processQueue();
}
return message.id;
}
async #processQueue() {
this.#processing = true;
while (this.#messageQueue.length > 0) {
const message = this.#messageQueue.shift();
await this.#deliverMessage(message);
}
this.#processing = false;
}
async #deliverMessage(message) {
const handlers = this.#topics.get(message.topic) || [];
if (handlers.length === 0) {
this.#deadLetterQueue.push({
...message,
reason: "No subscribers",
deadAt: Date.now(),
});
return;
}
const results = await Promise.allSettled(
handlers.map((handler) => handler(message.data, message))
);
const failures = results.filter((r) => r.status === "rejected");
if (failures.length > 0) {
message.attempts++;
if (message.attempts < message.maxRetries) {
// Exponential backoff retry
const delay = 1000 * Math.pow(2, message.attempts);
setTimeout(() => {
this.#messageQueue.push(message);
if (!this.#processing) this.#processQueue();
}, delay);
} else {
this.#deadLetterQueue.push({
...message,
reason: `Failed after ${message.maxRetries} attempts`,
errors: failures.map((f) => f.reason?.message),
deadAt: Date.now(),
});
}
}
}
getDeadLetterQueue() {
return [...this.#deadLetterQueue];
}
replayDeadLetters(topic) {
const toReplay = this.#deadLetterQueue.filter(
(m) => !topic || m.topic === topic
);
this.#deadLetterQueue = this.#deadLetterQueue.filter(
(m) => topic && m.topic !== topic
);
for (const message of toReplay) {
message.attempts = 0;
this.#messageQueue.push(message);
}
if (!this.#processing) this.#processQueue();
return toReplay.length;
}
}
// Usage
const asyncBus = new AsyncPubSub();
asyncBus.subscribe("email:send", async (data) => {
console.log(`Sending email to: ${data.to}`);
// Simulate async email sending
await new Promise((r) => setTimeout(r, 100));
});
asyncBus.subscribe("email:send", async (data) => {
console.log(`Logging email event: ${data.to}`);
});
await asyncBus.publish("email:send", {
to: "user@example.com",
subject: "Welcome",
});Channel-Based Pub/Sub
class ChannelPubSub {
#channels = new Map();
createChannel(name, options = {}) {
if (this.#channels.has(name)) {
throw new Error(`Channel "${name}" already exists`);
}
const channel = {
name,
subscribers: [],
history: [],
options: {
maxHistory: options.maxHistory || 100,
replayOnSubscribe: options.replayOnSubscribe || false,
transform: options.transform || null,
},
};
this.#channels.set(name, channel);
return this;
}
subscribe(channelName, handler) {
const channel = this.#channels.get(channelName);
if (!channel) throw new Error(`Channel "${channelName}" does not exist`);
channel.subscribers.push(handler);
// Replay history to new subscriber
if (channel.options.replayOnSubscribe) {
for (const msg of channel.history) {
handler(msg.data, msg);
}
}
return () => {
const idx = channel.subscribers.indexOf(handler);
if (idx > -1) channel.subscribers.splice(idx, 1);
};
}
publish(channelName, data) {
const channel = this.#channels.get(channelName);
if (!channel) throw new Error(`Channel "${channelName}" does not exist`);
const transformed = channel.options.transform
? channel.options.transform(data)
: data;
const message = {
id: crypto.randomUUID(),
channel: channelName,
data: transformed,
timestamp: Date.now(),
};
channel.history.push(message);
if (channel.history.length > channel.options.maxHistory) {
channel.history.shift();
}
channel.subscribers.forEach((handler) => {
try {
handler(transformed, message);
} catch (error) {
console.error(`Error in channel "${channelName}":`, error);
}
});
return message.id;
}
getHistory(channelName, limit) {
const channel = this.#channels.get(channelName);
if (!channel) return [];
const history = [...channel.history];
return limit ? history.slice(-limit) : history;
}
getChannelStats() {
const stats = {};
for (const [name, channel] of this.#channels) {
stats[name] = {
subscribers: channel.subscribers.length,
messagesInHistory: channel.history.length,
maxHistory: channel.options.maxHistory,
};
}
return stats;
}
}
// Usage
const channels = new ChannelPubSub();
channels.createChannel("notifications", {
maxHistory: 50,
replayOnSubscribe: true,
transform: (data) => ({
...data,
formattedTime: new Date().toLocaleTimeString(),
}),
});
channels.publish("notifications", { type: "info", text: "System started" });
channels.publish("notifications", { type: "warn", text: "High memory" });
// Late subscriber gets history replayed
channels.subscribe("notifications", (data) => {
console.log(`[${data.type}] ${data.text} at ${data.formattedTime}`);
});| Pub/Sub Variant | Delivery | Message Persistence | Use Case |
|---|---|---|---|
| Basic synchronous | Immediate | None | Simple decoupling |
| Wildcard topics | Immediate | None | Namespaced event routing |
| Async with queue | Queued, retried | Dead letter queue | Reliable delivery |
| Channel-based | Immediate + replay | History buffer | Late subscriber support |
Rune AI
Key Insights
- Pub/Sub decouples publishers from subscribers through a message broker: Neither side knows about the other, enabling independent development and deployment
- Wildcard topic matching routes messages across namespace hierarchies: Single-level (*) and multi-level (#) wildcards enable flexible subscription patterns
- Async pub/sub with retries and dead letter queues ensures reliable delivery: Failed messages are retried with exponential backoff and stored in a DLQ after exhausting retries
- Channel-based pub/sub replays message history to late subscribers: New subscribers receive buffered messages, ensuring they do not miss events published before they subscribed
- Always unsubscribe during cleanup to prevent memory leaks: Store unsubscribe functions and call them when components are destroyed or unmounted
Frequently Asked Questions
What is the difference between pub/sub and the observer pattern?
How do I handle message ordering in pub/sub?
What is a dead letter queue and when do I need one?
How do I prevent memory leaks in pub/sub systems?
Conclusion
The Pub/Sub pattern decouples message producers from consumers through topic-based routing. Wildcard matching enables namespace-aware subscriptions. Async pub/sub with retry and dead letter queues ensures reliable message delivery. Channel-based pub/sub supports late subscribers with message replay. For event bus implementation details, see Building an Event Bus with JS Pub/Sub Pattern. For the observer pattern that pub/sub extends, review JavaScript Observer Pattern: Complete Guide.
More in this topic
OffscreenCanvas API in JS for UI Performance
Master the OffscreenCanvas API to offload rendering from the main thread. Covers worker-based 2D and WebGL rendering, animation loops inside workers, bitmap transfer, double buffering, chart rendering pipelines, image processing, and performance measurement strategies.
Advanced Web Workers for High Performance JS
Master Web Workers for truly parallel JavaScript execution. Covers dedicated and shared workers, structured cloning, transferable objects, SharedArrayBuffer with Atomics, worker pools, task scheduling, Comlink RPC patterns, module workers, and performance profiling strategies.
JavaScript Macros and Abstract Code Generation
Master JavaScript code generation techniques for compile-time and runtime metaprogramming. Covers AST manipulation, Babel plugin authorship, tagged template literals as macros, code generation pipelines, source-to-source transformation, compile-time evaluation, and safe eval alternatives.