Safer postgres pubsub with auto reconnect, channel length checks, and notify queue during reconnect
Other packages I've seen have problems, for example:
- They don't handle reconnection
- They silently accept channels that are too long, and don't receive messages because Postgres truncates channels to 63 characters
- They reserve some channel names
@jcoreio/pg-ipc solves all of these problems:
- It automatically reconnects with exponential backoff, and replays
LISTENs on reconnect - It throws errors when you listen or notify on a channel name that's too long
- It doesn't reserve any channel names
- It queues notify calls that fail and replays them on reconnect
- @jcoreio/pg-ipc
- Why another package?
- Table of Contents
- API
class PgIpc<T = any>constructor(options: PgIpcOptions).listen(channel: string, listener: Listener<T>): Promise<void>.unlisten(channel: string, listener: Listener<T>): Promise<void>.notify(channel: string, payload?: T): Promise<void>- Event
'error' - Event
'warning' - Event
'connecting' - Event
'connected' - Event
'disconneted' - Event
'ready' - Event
'end'
T is the parsed payload type.
import PgIpc from '@jcoreio/pg-ipc'type PgIpcOptions<
/**
* Payload type
*/
T = any,
> = {
/**
* Creates a new pg Client. This will be called on every connection attempt
*/
newClient: () => Client
/**
* Pass an object with some or all console methods to enable logging
*/
log?: Partial<typeof console>
/**
* Only use this if you compiled postgres with a custom NAMEDATALEN to allow longer channel names.
* Default: 63
*/
maxChannelLength?: number
/**
* Converts payloads to strings. Default: JSON.stringify
*/
stringify?: (payload: T) => string
/**
* Parses raw payloads from Postgres notifications. Default: JSON.parse
*/
parse?: (rawPayload: string) => T
/**
* Reconnect exponential backoff options
*/
reconnect?: {
/**
* The delay before the first reconnect attempt, in milliseconds
* Default: maxDelay / 10 or 1000.
*/
initialDelay?: number
/**
* The maximum delay between reconnect attempts, in milliseconds
* Default: initialDelay * 10 or 10000.
*/
maxDelay?: number
/**
* The maximum number of times to try to reconnect before giving up and emitting an error.
* Default: Infinity (never stop retrying)
*/
maxRetries?: number
/**
* The factor to multiply the reconnect delay by after each failure. Default: 2
*/
factor?: number
}
}Registers the given listener to listen to the given channel.
The listener will be called with the channel and maybe the parsed payload (but if NOTIFY was called without a payload, it will only be called with the channel)
type Listener<T = any> = (channel: string, payload?: T) => anyThe returned promise will resolve once the LISTEN query has completed (if the channel is not already subscribed), or reject if the channel name is too long or the LISTEN query fails.
However, PgIpc will store the listener in memory and retry the LISTEN on reconnect if it failed.
Unregisters the given listener from the given channel.
The returned promise will resolve once the UNLISTEN query has completed (if necessary, but if there are other listeners on the channel it won't UNLISTEN until they're all removed), or reject if the UNLISTEN query fails.
Sends a notification to Postgres.
The returned promise will resolve once the NOTIFY query has completed. If your options.stringify threw an error, it will reject and emit an 'error'.
If the query fails, it will reject and emit a 'warning', and enqueue the notification to retry upon reconnect.
Emitted with one argument, the Error that occurred.
Emitted if an error occurs:
- All
maxRetriesconnect attempts fail parsethrows an error on a raw notification payload from Postgres- A listener throws an error
Emitted with one argument, the Error that occurred
Emitted if a less severe error occurs:
- The postgres client emits an
'error'
Emitted when PgIpc is about to try to connect
Emitted with one argument, the postgres Client
Emitted when PgIpc successfully connected (but before replaying LISTEN and NOTIFY queries)
Emitted with one argument, the postgres Client
Emitted when the client disconnects.
Emitted when PgIpc finishes replaying LISTEN and NOTIFY queries after connecting.
Emitted when PgIpc is ended by your code or all maxRetries connection attempts failed.