The JavaScript Pub/Sub Pattern: Complete Guide

A complete guide to the JavaScript Pub/Sub pattern. Covers publish/subscribe fundamentals, topic-based messaging, wildcard subscriptions, message filtering, async pub/sub, dead letter queues, and building scalable event-driven architectures.

JavaScriptadvanced
16 min read

The Publish/Subscribe pattern decouples message producers (publishers) from consumers (subscribers) through a message broker. Unlike the observer pattern where subjects know their observers directly, pub/sub introduces a mediator that routes messages by topic, enabling truly loosely coupled architectures.

For building event buses with pub/sub, see Building an Event Bus with JS Pub/Sub Pattern.

Core Pub/Sub Implementation

javascriptjavascript
class PubSub {
  #topics = new Map();
  #subId = 0;
 
  subscribe(topic, handler, options = {}) {
    const id = ++this.#subId;
    const { priority = 0, filter = null } = options;
 
    if (!this.#topics.has(topic)) {
      this.#topics.set(topic, []);
    }
 
    const subscription = { id, handler, priority, filter, active: true };
    this.#topics.get(topic).push(subscription);
    this.#topics.get(topic).sort((a, b) => b.priority - a.priority);
 
    return {
      id,
      unsubscribe: () => {
        const subs = this.#topics.get(topic);
        if (subs) {
          const idx = subs.findIndex((s) => s.id === id);
          if (idx > -1) subs.splice(idx, 1);
        }
      },
      pause: () => { subscription.active = false; },
      resume: () => { subscription.active = true; },
    };
  }
 
  publish(topic, data, meta = {}) {
    const message = {
      id: crypto.randomUUID(),
      topic,
      data,
      timestamp: Date.now(),
      ...meta,
    };
 
    const subscribers = this.#topics.get(topic) || [];
    let delivered = 0;
 
    for (const sub of subscribers) {
      if (!sub.active) continue;
      if (sub.filter && !sub.filter(message)) continue;
 
      try {
        sub.handler(message);
        delivered++;
      } catch (error) {
        console.error(`Subscriber ${sub.id} error on "${topic}":`, error);
      }
    }
 
    return { messageId: message.id, delivered };
  }
 
  hasSubscribers(topic) {
    return (this.#topics.get(topic)?.length || 0) > 0;
  }
 
  getTopics() {
    return [...this.#topics.keys()];
  }
 
  clear(topic) {
    if (topic) {
      this.#topics.delete(topic);
    } else {
      this.#topics.clear();
    }
  }
}
 
// Usage
const bus = new PubSub();
 
const sub = bus.subscribe("user:created", (msg) => {
  console.log("New user:", msg.data.name);
});
 
bus.publish("user:created", { name: "Alice", email: "alice@test.com" });
 
sub.pause();  // Temporarily stop receiving
bus.publish("user:created", { name: "Bob" }); // Not delivered
sub.resume(); // Resume receiving

Wildcard Topic Matching

javascriptjavascript
class WildcardPubSub extends PubSub {
  #wildcardSubs = [];
 
  subscribePattern(pattern, handler, options = {}) {
    const id = ++this._subId || Date.now();
    const regex = this.#patternToRegex(pattern);
 
    const sub = {
      id,
      pattern,
      regex,
      handler,
      active: true,
      ...options,
    };
 
    this.#wildcardSubs.push(sub);
 
    return {
      id,
      unsubscribe: () => {
        const idx = this.#wildcardSubs.findIndex((s) => s.id === id);
        if (idx > -1) this.#wildcardSubs.splice(idx, 1);
      },
    };
  }
 
  #patternToRegex(pattern) {
    const escaped = pattern
      .replace(/[.+^${}()|[\]\\]/g, "\\$&")
      .replace(/\*/g, "[^.]+")
      .replace(/#/g, ".*");
    return new RegExp(`^${escaped}$`);
  }
 
  publish(topic, data, meta = {}) {
    const result = super.publish(topic, data, meta);
 
    const message = {
      id: crypto.randomUUID(),
      topic,
      data,
      timestamp: Date.now(),
      ...meta,
    };
 
    for (const sub of this.#wildcardSubs) {
      if (!sub.active) continue;
      if (sub.regex.test(topic)) {
        try {
          sub.handler(message);
          result.delivered++;
        } catch (error) {
          console.error(`Wildcard subscriber error:`, error);
        }
      }
    }
 
    return result;
  }
}
 
// Usage
const wbus = new WildcardPubSub();
 
// Match any event in the "orders" namespace
wbus.subscribePattern("orders.*", (msg) => {
  console.log(`Orders event [${msg.topic}]:`, msg.data);
});
 
// Match any deeply nested user event
wbus.subscribePattern("users.#", (msg) => {
  console.log(`User event [${msg.topic}]:`, msg.data);
});
 
wbus.publish("orders.created", { orderId: "ORD-001" });
wbus.publish("orders.shipped", { orderId: "ORD-001" });
wbus.publish("users.profile.updated", { userId: "U1" });
wbus.publish("users.settings.theme.changed", { theme: "dark" });

Async Pub/Sub with Queuing

javascriptjavascript
class AsyncPubSub {
  #topics = new Map();
  #deadLetterQueue = [];
  #processing = false;
  #messageQueue = [];
 
  subscribe(topic, handler) {
    if (!this.#topics.has(topic)) {
      this.#topics.set(topic, []);
    }
 
    this.#topics.get(topic).push(handler);
 
    return () => {
      const handlers = this.#topics.get(topic);
      const idx = handlers?.indexOf(handler);
      if (idx > -1) handlers.splice(idx, 1);
    };
  }
 
  async publish(topic, data) {
    const message = {
      id: crypto.randomUUID(),
      topic,
      data,
      timestamp: Date.now(),
      attempts: 0,
      maxRetries: 3,
    };
 
    this.#messageQueue.push(message);
 
    if (!this.#processing) {
      await this.#processQueue();
    }
 
    return message.id;
  }
 
  async #processQueue() {
    this.#processing = true;
 
    while (this.#messageQueue.length > 0) {
      const message = this.#messageQueue.shift();
      await this.#deliverMessage(message);
    }
 
    this.#processing = false;
  }
 
  async #deliverMessage(message) {
    const handlers = this.#topics.get(message.topic) || [];
 
    if (handlers.length === 0) {
      this.#deadLetterQueue.push({
        ...message,
        reason: "No subscribers",
        deadAt: Date.now(),
      });
      return;
    }
 
    const results = await Promise.allSettled(
      handlers.map((handler) => handler(message.data, message))
    );
 
    const failures = results.filter((r) => r.status === "rejected");
 
    if (failures.length > 0) {
      message.attempts++;
      if (message.attempts < message.maxRetries) {
        // Exponential backoff retry
        const delay = 1000 * Math.pow(2, message.attempts);
        setTimeout(() => {
          this.#messageQueue.push(message);
          if (!this.#processing) this.#processQueue();
        }, delay);
      } else {
        this.#deadLetterQueue.push({
          ...message,
          reason: `Failed after ${message.maxRetries} attempts`,
          errors: failures.map((f) => f.reason?.message),
          deadAt: Date.now(),
        });
      }
    }
  }
 
  getDeadLetterQueue() {
    return [...this.#deadLetterQueue];
  }
 
  replayDeadLetters(topic) {
    const toReplay = this.#deadLetterQueue.filter(
      (m) => !topic || m.topic === topic
    );
 
    this.#deadLetterQueue = this.#deadLetterQueue.filter(
      (m) => topic && m.topic !== topic
    );
 
    for (const message of toReplay) {
      message.attempts = 0;
      this.#messageQueue.push(message);
    }
 
    if (!this.#processing) this.#processQueue();
    return toReplay.length;
  }
}
 
// Usage
const asyncBus = new AsyncPubSub();
 
asyncBus.subscribe("email:send", async (data) => {
  console.log(`Sending email to: ${data.to}`);
  // Simulate async email sending
  await new Promise((r) => setTimeout(r, 100));
});
 
asyncBus.subscribe("email:send", async (data) => {
  console.log(`Logging email event: ${data.to}`);
});
 
await asyncBus.publish("email:send", {
  to: "user@example.com",
  subject: "Welcome",
});

Channel-Based Pub/Sub

javascriptjavascript
class ChannelPubSub {
  #channels = new Map();
 
  createChannel(name, options = {}) {
    if (this.#channels.has(name)) {
      throw new Error(`Channel "${name}" already exists`);
    }
 
    const channel = {
      name,
      subscribers: [],
      history: [],
      options: {
        maxHistory: options.maxHistory || 100,
        replayOnSubscribe: options.replayOnSubscribe || false,
        transform: options.transform || null,
      },
    };
 
    this.#channels.set(name, channel);
    return this;
  }
 
  subscribe(channelName, handler) {
    const channel = this.#channels.get(channelName);
    if (!channel) throw new Error(`Channel "${channelName}" does not exist`);
 
    channel.subscribers.push(handler);
 
    // Replay history to new subscriber
    if (channel.options.replayOnSubscribe) {
      for (const msg of channel.history) {
        handler(msg.data, msg);
      }
    }
 
    return () => {
      const idx = channel.subscribers.indexOf(handler);
      if (idx > -1) channel.subscribers.splice(idx, 1);
    };
  }
 
  publish(channelName, data) {
    const channel = this.#channels.get(channelName);
    if (!channel) throw new Error(`Channel "${channelName}" does not exist`);
 
    const transformed = channel.options.transform
      ? channel.options.transform(data)
      : data;
 
    const message = {
      id: crypto.randomUUID(),
      channel: channelName,
      data: transformed,
      timestamp: Date.now(),
    };
 
    channel.history.push(message);
    if (channel.history.length > channel.options.maxHistory) {
      channel.history.shift();
    }
 
    channel.subscribers.forEach((handler) => {
      try {
        handler(transformed, message);
      } catch (error) {
        console.error(`Error in channel "${channelName}":`, error);
      }
    });
 
    return message.id;
  }
 
  getHistory(channelName, limit) {
    const channel = this.#channels.get(channelName);
    if (!channel) return [];
    const history = [...channel.history];
    return limit ? history.slice(-limit) : history;
  }
 
  getChannelStats() {
    const stats = {};
    for (const [name, channel] of this.#channels) {
      stats[name] = {
        subscribers: channel.subscribers.length,
        messagesInHistory: channel.history.length,
        maxHistory: channel.options.maxHistory,
      };
    }
    return stats;
  }
}
 
// Usage
const channels = new ChannelPubSub();
 
channels.createChannel("notifications", {
  maxHistory: 50,
  replayOnSubscribe: true,
  transform: (data) => ({
    ...data,
    formattedTime: new Date().toLocaleTimeString(),
  }),
});
 
channels.publish("notifications", { type: "info", text: "System started" });
channels.publish("notifications", { type: "warn", text: "High memory" });
 
// Late subscriber gets history replayed
channels.subscribe("notifications", (data) => {
  console.log(`[${data.type}] ${data.text} at ${data.formattedTime}`);
});
Pub/Sub VariantDeliveryMessage PersistenceUse Case
Basic synchronousImmediateNoneSimple decoupling
Wildcard topicsImmediateNoneNamespaced event routing
Async with queueQueued, retriedDead letter queueReliable delivery
Channel-basedImmediate + replayHistory bufferLate subscriber support
Rune AI

Rune AI

Key Insights

  • Pub/Sub decouples publishers from subscribers through a message broker: Neither side knows about the other, enabling independent development and deployment
  • Wildcard topic matching routes messages across namespace hierarchies: Single-level (*) and multi-level (#) wildcards enable flexible subscription patterns
  • Async pub/sub with retries and dead letter queues ensures reliable delivery: Failed messages are retried with exponential backoff and stored in a DLQ after exhausting retries
  • Channel-based pub/sub replays message history to late subscribers: New subscribers receive buffered messages, ensuring they do not miss events published before they subscribed
  • Always unsubscribe during cleanup to prevent memory leaks: Store unsubscribe functions and call them when components are destroyed or unmounted
RunePowered by Rune AI

Frequently Asked Questions

What is the difference between pub/sub and the observer pattern?

In the observer pattern, the subject (observable) directly references and notifies its observers. In pub/sub, publishers and subscribers do not know about each other. A message broker mediates between them, routing messages by topic. Pub/sub enables looser coupling and is better suited for distributed systems or large applications with many independent components.

How do I handle message ordering in pub/sub?

Synchronous pub/sub delivers messages in publication order by default. For async pub/sub, use a sequential queue processor (process one message at a time) when ordering matters. If order is critical across multiple topics, include a sequence number in messages and have subscribers buffer and reorder if needed.

What is a dead letter queue and when do I need one?

dead letter queue (DLQ) stores messages that could not be delivered after maximum retry attempts. You need one when message loss is unacceptable (payment notifications, order confirmations). The DLQ provides visibility into delivery failures and enables manual replay once the underlying issue is fixed.

How do I prevent memory leaks in pub/sub systems?

lways store and call the unsubscribe function when a component is destroyed. Set maximum limits on topic history and subscriber counts. Monitor subscriber counts over time and alert when they exceed expected thresholds. In browser environments, unsubscribe in component teardown or page unload handlers.

Conclusion

The Pub/Sub pattern decouples message producers from consumers through topic-based routing. Wildcard matching enables namespace-aware subscriptions. Async pub/sub with retry and dead letter queues ensures reliable message delivery. Channel-based pub/sub supports late subscribers with message replay. For event bus implementation details, see Building an Event Bus with JS Pub/Sub Pattern. For the observer pattern that pub/sub extends, review JavaScript Observer Pattern: Complete Guide.