@hyperfrontend/network-protocol/queue

Queue

Purpose

The Queue module provides FIFO (First-In-First-Out) message processing queues with async operation support, backpressure management, and stop/resume lifecycle controls. Queues are the fundamental building blocks for the sender and receiver pipelines.


Key Interfaces

Queue<T>

The main queue interface for processing messages.

interface Queue<T extends object> {
  addMessage: (message: T) => void // Add a message to the queue
  isRunning: () => boolean // Check if the queue is processing
  stop: () => void // Pause processing (messages accumulate)
  resume: () => void // Resume processing accumulated messages
  size: () => number // Get current queue depth
  currentMessage: () => T | null // Get the message currently being processed
}

MessageHandler<T>

The async processing function type used internally by queues.

type MessageHandler<T extends object> = (message: T) => Promise<void> | void

Queue Types

Each queue type is specialized for a specific transformation step in the packet pipeline:

Outbound Queues (Sender)

Queue Creator Input Type Output Type Purpose
createEncryptionQueue UnencryptedPacket UnserializedEncryptedPacket Encrypt plaintext data
createSerializationQueue UnserializedEncryptedPacket SerializedEncryptedPacket Binary → Base64 string
createObfuscationQueue SerializedEncryptedPacket ObfuscatedPacket Time-based obfuscation

Inbound Queues (Receiver)

Queue Creator Input Type Output Type Purpose
createDeobfuscationQueue ObfuscatedPacket SerializedEncryptedPacket Remove obfuscation
createDeserializationQueue SerializedEncryptedPacket UnserializedEncryptedPacket Base64 string → Binary
createDecryptionQueue UnserializedEncryptedPacket UnencryptedPacket Decrypt to plaintext

Queue Pipeline

Queues are chained together to form processing pipelines where the onSuccess callback of one queue feeds the addMessage of the next:

Outbound Pipeline (Sender)

Inbound Pipeline (Receiver)


Factory Functions

createQueue<T>

The base queue factory that all specialized queues use internally.

Signature:

function createQueue<T extends Record<string, any>>(
  processMessage: MessageHandler<T>,
  autoStart?: boolean // default: true
): Queue<T>

Example:

import { createQueue } from '@hyperfrontend/network-protocol/lib/queue'

// Create a simple message queue
const queue = createQueue<{ id: string; payload: string }>(
  async (message) => {
    console.log(`Processing: ${message.id}`)
    await someAsyncOperation(message)
  },
  true // autoStart
)

queue.addMessage({ id: '1', payload: 'hello' }) // Immediately starts processing

createEncryptionQueue

Creates a queue that encrypts plaintext packets.

Signature:

type EncryptionQueueCreater = (
  label: string,
  packetEncryption: PacketEncryption,
  logger: Logger,
  onSuccess: (packet: UnserializedEncryptedPacket) => void,
  onFail: (raw: unknown) => void
) => Queue<UnencryptedPacket>

Parameters:

Parameter Type Description
label string Identifier for logging (e.g., 'channel-1:encrypt')
packetEncryption PacketEncryption Async function that encrypts packets
logger Logger Logger instance from @hyperfrontend/logging
onSuccess function Called with encrypted packet on success
onFail function Called with raw input on failure

Example:

import { createEncryptionQueue } from '@hyperfrontend/network-protocol/lib/queue'
import { createLogger } from '@hyperfrontend/logging'

const logger = createLogger({ level: 'debug' })
const successes: UnserializedEncryptedPacket[] = []
const failures: unknown[] = []

const encryptionQueue = createEncryptionQueue(
  'my-channel:encryption',
  async (packet) => {
    // Platform-specific encryption (injected)
    return await encryptPacket(packet)
  },
  logger,
  (encrypted) => successes.push(encrypted),
  (failed) => failures.push(failed)
)

// Add a packet to encrypt
encryptionQueue.addMessage({
  origin: 'window-a',
  target: 'window-b',
  data: { pid: '...', id: '...', sequence: 1, key: null, message: {...}, schema: null, schemaHash: null }
})

createSerializationQueue

Creates a queue that serializes binary encrypted data to Base64 strings.

Signature:

type SerializationQueueCreater = (
  label: string,
  packetSerialization: PacketSerialization,
  logger: Logger,
  onSuccess: (packet: SerializedEncryptedPacket) => void,
  onFail: (raw: unknown) => void
) => Queue<UnserializedEncryptedPacket>

createObfuscationQueue

Creates a queue that applies time-based obfuscation to serialized packets.

Signature:

type ObfuscationQueueCreater = (
  label: string,
  packetObfuscation: PacketObfuscation,
  logger: Logger,
  onSuccess: (packet: ObfuscatedPacket) => void,
  onFail: (raw: unknown) => void
) => Queue<SerializedEncryptedPacket>

createDeobfuscationQueue

Creates a queue that removes time-based obfuscation from incoming packets.

Signature:

type DeobfuscationQueueCreater = (
  label: string,
  packetDeobfuscation: PacketDeobfuscation,
  logger: Logger,
  onSuccess: (packet: SerializedEncryptedPacket) => void,
  onFail: (raw: unknown) => void
) => Queue<ObfuscatedPacket>

createDeserializationQueue

Creates a queue that deserializes Base64 strings back to binary data.

Signature:

type DeserializationQueueCreater = (
  label: string,
  packetDeserialization: PacketDeserialization,
  logger: Logger,
  onSuccess: (packet: UnserializedEncryptedPacket) => void,
  onFail: (raw: unknown) => void
) => Queue<SerializedEncryptedPacket>

createDecryptionQueue

Creates a queue that decrypts binary encrypted packets back to plaintext.

Signature:

type DecryptionQueueCreater = (
  label: string,
  packetDecryption: PacketDecryption,
  logger: Logger,
  onSuccess: (packet: UnencryptedPacket) => void,
  onFail: (raw: unknown) => void
) => Queue<UnserializedEncryptedPacket>

Queue Chaining Example

This example demonstrates how to chain outbound queues together:

import { createEncryptionQueue, createSerializationQueue, createObfuscationQueue } from '@hyperfrontend/network-protocol/lib/queue'
import { createLogger } from '@hyperfrontend/logging'

const logger = createLogger({ level: 'info' })
const failures: unknown[] = []

// Create the chain backwards (from transport to source)

// 3. Obfuscation queue (final step, sends to transport)
const obfuscationQueue = createObfuscationQueue(
  'channel:obfuscate',
  packetObfuscation,
  logger,
  (obfuscated) => transport.send(obfuscated), // Final destination
  (raw) => failures.push(raw)
)

// 2. Serialization queue (feeds obfuscation)
const serializationQueue = createSerializationQueue(
  'channel:serialize',
  packetSerialization,
  logger,
  (serialized) => obfuscationQueue.addMessage(serialized), // Chain to next queue
  (raw) => failures.push(raw)
)

// 1. Encryption queue (entry point)
const encryptionQueue = createEncryptionQueue(
  'channel:encrypt',
  packetEncryption,
  logger,
  (encrypted) => serializationQueue.addMessage(encrypted), // Chain to next queue
  (raw) => failures.push(raw)
)

// Use the chain: add messages to encryption queue
encryptionQueue.addMessage(unencryptedPacket)

Lifecycle Management

Stop/Resume for Backpressure

Queues can be paused to handle backpressure scenarios:

const queue = createEncryptionQueue(/* ... */)

// Check queue depth for backpressure
if (queue.size() > 100) {
  console.warn('Queue backpressure detected, pausing upstream')
  upstreamSource.pause()
}

// Pause processing (messages still accumulate)
queue.stop()
console.log(queue.isRunning()) // false

// Messages added while stopped will queue up
queue.addMessage(packet1)
queue.addMessage(packet2)
console.log(queue.size()) // 2

// Resume processing
queue.resume()
// → Processes packet1, then packet2 in FIFO order

Monitoring Queue State

// Get current queue depth
const depth = queue.size()

// Check if actively processing
const active = queue.isRunning()

// Get the message currently being processed
const current = queue.currentMessage()
if (current) {
  console.log(`Currently processing: ${current.origin} → ${current.target}`)
}

Error Handling

Each specialized queue validates inputs and handles errors gracefully:

  1. Input Validation: Invalid packets are rejected via onFail
  2. Operation Errors: Encryption/serialization failures call onFail
  3. Pipeline Continuity: Failed messages don't block subsequent messages
const encryptionQueue = createEncryptionQueue(
  'my-channel:encrypt',
  packetEncryption,
  logger,
  (encrypted) => nextQueue.addMessage(encrypted),
  (failed) => {
    // Handle failed encryption
    logger.error('Encryption failed for packet', failed)
    metrics.incrementFailureCount()
    // Optionally: retry, dead-letter, or discard
  }
)

Validation Errors

Queue creators validate all parameters at creation time:

// These will throw Error:
createEncryptionQueue('', ...)           // Empty label
createEncryptionQueue(label, null, ...)  // Missing operation
createEncryptionQueue(label, op, null, ...) // Missing logger

Relationship to Other Modules


See Also

Related Modules

Module Relationship
sender/ Uses queues for outbound pipeline
receiver/ Uses queues for inbound pipeline
packet/ Packet types processed in queues

API Reference

ƒ Functions

§function

createQueue<T>(processMessage: MessageHandler<T>, autoStart: boolean): Queue<T>

Creates a message processing queue with FIFO ordering. Provides methods to add messages, control processing, and monitor queue state.

Parameters

NameTypeDescription
§processMessage
MessageHandler<T>
The handler function to process each message
§autoStart
boolean
Whether to automatically start processing messages (default: true)
(default: true)

Returns

Queue<T>
A Queue instance with methods to manage message processing

Example

Creating a message processing queue

const queue = createQueue(async (message) => {
  await processMessage(message)
})
queue.addMessage({ type: 'ping', data: {} })

Interfaces

§interface

Queue

Message processing queue interface

Properties

§readonly addMessage:(message: T) => void
Adds a message to the queue
§readonly currentMessage:() => T
Returns the message currently being processed
§readonly isRunning:() => boolean
Returns whether the queue is processing messages
§readonly resume:() => void
Resumes queue processing
§readonly size:() => number
Returns the number of messages in the queue
§readonly stop:() => void
Stops queue processing
§interface

QueueCreatorArguments

Arguments for creating a queue instance

Properties

§label:string
Queue label for logging
§logger:Logger
Logger instance
§onFail:(raw: unknown) => void
Callback on packet processing failure
§onSuccess:(packet: T) => void
Callback on successful packet processing
§operation:QueueOperation
Packet operation function
§interface

QueueCreatorValidity

Validation result for queue creator arguments

Properties

§label:boolean
Whether label is valid
§logger:boolean
Whether logger is valid
§onFail:boolean
Whether onFail callback is valid
§onSuccess:boolean
Whether onSuccess callback is valid
§operation:boolean
Whether operation is valid

Types

§type

DecryptionQueueCreater

Factory function for creating decryption queues
type DecryptionQueueCreater = (label: string, packetDecryption: PacketDecryption, logger: Logger, onSuccess: (packet: UnencryptedPacket) => void, onFail: (raw: unknown) => void) => Queue<UnserializedEncryptedPacket>
§type

DeobfuscationQueueCreater

Factory function for creating deobfuscation queues
type DeobfuscationQueueCreater = (label: string, packetDeobfuscation: PacketDeobfuscation, logger: Logger, onSuccess: (packet: SerializedEncryptedPacket) => void, onFail: (raw: unknown) => void) => Queue<ObfuscatedPacket>
§type

DeserializationQueueCreater

Factory function for creating deserialization queues
type DeserializationQueueCreater = (label: string, packetDeserialization: PacketDeserialization, logger: Logger, onSuccess: (packet: UnserializedEncryptedPacket) => void, onFail: (raw: unknown) => void) => Queue<SerializedEncryptedPacket>
§type

EncryptionQueueCreater

Factory function for creating encryption queues
type EncryptionQueueCreater = (label: string, packetEncryption: PacketEncryption, logger: Logger, onSuccess: (packet: UnserializedEncryptedPacket) => void, onFail: (raw: unknown) => void) => Queue<UnencryptedPacket>
§type

MessageHandler

Function that handles messages from a queue
type MessageHandler = (message: T) => Promise<void> | void
§type

ObfuscationQueueCreater

Factory function for creating obfuscation queues
type ObfuscationQueueCreater = (label: string, packetObfuscation: PacketObfuscation, logger: Logger, onSuccess: (packet: ObfuscatedPacket) => void, onFail: (raw: unknown) => void) => Queue<SerializedEncryptedPacket>
§type

QueueOperation

Union of all packet operation types for queue processing
type QueueOperation = PacketEncryption | PacketSerialization | PacketObfuscation | PacketDeobfuscation | PacketDeserialization | PacketDecryption
§type

SerializationQueueCreater

Factory function for creating serialization queues
type SerializationQueueCreater = (label: string, packetSerialization: PacketSerialization, logger: Logger, onSuccess: (packet: SerializedEncryptedPacket) => void, onFail: (raw: unknown) => void) => Queue<UnserializedEncryptedPacket>

Variables

§type

createDecryptionQueue

Creates a decryption queue for processing encrypted packets.
§type

createDeobfuscationQueue

Creates a deobfuscation queue for processing obfuscated packets.
§type

createDeserializationQueue

Creates a deserialization queue for processing serialized encrypted packets.
§type

createEncryptionQueue

Creates an encryption queue for processing unencrypted packets.
§type

createObfuscationQueue

Creates an obfuscation queue for processing serialized encrypted packets.
§type

createSerializationQueue

Creates a serialization queue for processing unserialized encrypted packets.