Build real-time notifications with Postgres 🐘

Cover Image for Build real-time notifications with Postgres 🐘

This tutorial is simply demonstrating an experiment. Before using in production, make your own evaluation within the context of your application and resources.

In the exciting world of web development, I recently found myself experimenting the pg-pubsub and exploring its potential when paired with Server-Side Events (SSE).

In this tutorial, we're going to implement real-time notifications, by using the power of pg-pubsub and SSE.

Now, let's start on our journey with SSE.

The beauty of SSE lies in its ability to maintain the backend client connection, ensuring it stays alive and active.

This feature means that as soon as a notification is published, our client receives it instantly, eliminating the need for any fetch.

To consume SSE, we'll use EventSource.

So, let's dive in and create our hook.

// Import hooks from React.
import { useEffect, useState } from "react";
 
// Define prop types for the hook, T can be any type of data.
interface UseEventSourceProps<T> {
  // Optionally provide an initial state.
  initialState?: T[];
  // The URL for the EventSource to connect to.
  url: string;
}
 
// Define the hook.
export const useEventSource = <T>({
  initialState,
  url
}: UseEventSourceProps<T>) => {
  // Define state to hold the incoming data.
  const [data, setData] = useState<T[]>(initialState || []);
 
  // Function to establish a connection to a server-side event stream.
  const connectToStream = () => {
    // Initialize EventSource with the provided url & set withCredentials to true.
    const eventSource = new EventSource(url, {
      withCredentials: true
    });
 
    // Add an event listener for message events from the server.
    eventSource.addEventListener("message", (event) => {
      // Parse the incoming data and add it to the state.
      const newData = JSON.parse(event.data);
      setData((prevData) => [newData, ...prevData]);
    });
 
    // Add an event listener for error events.
    eventSource.addEventListener("error", () => {
      // Close the connection on error, and try to reconnect after 1ms.
      eventSource.close();
      setTimeout(connectToStream, 1);
    });
 
    // Return the EventSource instance.
    return eventSource;
  };
 
  // Use an effect to establish the event stream connection on mount.
  useEffect(() => {
    const eventSource = connectToStream();
 
    // Disconnect from the event stream when this component unmounts.
    return () => {
      eventSource.close();
    };
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, []); // Run this effect only once, on mount.
 
  // The hook returns the data from the event stream.
  return data;
};
 
 

Great, now let's create our API endpoint for notifications.

// to prevent caching on this endpoint
export const dynamic = "force-dynamic";
 
export const GET = async () => {
  try {
    const encoder = new TextEncoder();
    const readableStream = new ReadableStream({
      start: (controller) => {
        const payload = { message: "hello" };
        controller.enqueue(
          encoder.encode(`event: message\ndata: ${JSON.stringify(payload)}\n\n`)
        );
      },
      cancel: () => {
        console.log("cancel");
      },
    });
 
    return new Response(readableStream, {
      headers: {
        "Access-Control-Allow-Origin": "*",
        "Content-Type": "text/event-stream; charset=utf-8",
        Connection: "keep-alive",
        "Cache-Control": "no-cache, no-transform",
        "Content-Encoding": "none",
        "X-Accel-Buffering": "no",
      },
    });
  } catch (error) {
    return Response.json({ success: false, error });
  }
};

Now, in the frontend, when we use the useEventSource hook, we should be receiving our payload.

Next, we'll add the last component, PgPubsub.

Let's create an instance.

import PubSub from "pg-pubsub";
 
import { env } from "./env";
 
const pubSubSingleton = () => {
  return new PubSub(env.DATABASE_URL);
};
 
declare global {
  var pubSub: undefined | ReturnType<typeof pubSubSingleton>;
}
 
export const pubSub = globalThis.pubSub ?? pubSubSingleton();
 
if (process.env.NODE_ENV !== "production") {
  globalThis.pubSub = pubSub;
}

Now, whenever we publish a message to any channel of pg-pubsub, we can listen to it.

Let's quickly create a utility function that we can call anywhere in the backend.

const NOTIFICATIONS_CHANNEL = "NOTIFICATIONS_CHANNEL";
export const notify = async (payload: { message: string }) => {
  await pubSub?.addChannel(NOTIFICATIONS_CHANNEL);
  await pubSub?.publish(NOTIFICATIONS_CHANNEL, payload);
};

Done! As a final step, we should listen to the same channel in our API route to enqueue new notifications.

import { auth } from "@/auth";
 
import { pubSub } from "@/lib/db";
 
import { notify } from "@/lib/helpers/notify";
 
export const dynamic = "force-dynamic";
 
export const GET = async () => {
  try {
    // ideally we should authenticate user here
 
    await pubSub?.addChannel("NOTIFICATIONS_CHANNEL");
 
    // we could also send previous notifications to user, to do that you would  get them from database here,
 
    // const previousNotifications = await getPreviousNotifications();
 
    const encoder = new TextEncoder();
 
    const readableStream = new ReadableStream({
      start: (controller) => {
        // and enqueue  previous notifications here
 
        /*  previousNotifications.forEach((notification) => {
          controller.enqueue(
            encoder.encode(
              `event: message\ndata: ${JSON.stringify(notification)}\n\n`
            )
          );
        }); */
 
        pubSub?.on("NOTIFICATIONS_CHANNEL", (payload) => {
          console.log("on", payload);
          controller.enqueue(
            encoder.encode(
              `event: message\ndata: ${JSON.stringify(payload)}\n\n`
            )
          );
        });
      },
 
      cancel: () => {
        console.log("cancel");
 
        pubSub?.removeChannel("NOTIFICATIONS_CHANNEL");
 
        pubSub.off("NOTIFICATIONS_CHANNEL", () => {
          console.log("off");
        });
      },
    });
 
    return new Response(readableStream, {
      headers: {
        "Access-Control-Allow-Origin": "*",
 
        "Content-Type": "text/event-stream; charset=utf-8",
 
        Connection: "keep-alive",
 
        "Cache-Control": "no-cache, no-transform",
 
        "Content-Encoding": "none",
 
        "X-Accel-Buffering": "no",
      },
    });
  } catch (error) {
    return Response.json({ success: false, error });
  }
};