ClientRMQ

  
class ClientRMQ extends ClientProxy {
  constructor(options: { urls?: string[] | RmqUrl[]; queue?: string; prefetchCount?: number; isGlobalPrefetchCount?: boolean; queueOptions?: AmqplibQueueOptions; socketOptions?: AmqpConnectionManagerSocketOptions; ... 13 more ...; maxConnectionAttempts?: number; })
  protected logger: Logger
  protected connection$: ReplaySubject<any>
  protected connectionPromise: Promise<void>
  protected client: AmqpConnectionManager | null
  protected channel: ChannelWrapper | null
  protected pendingEventListeners: Array<{...}
  protected isInitialConnect: true
  protected responseEmitter: EventEmitter
  protected queue: string
  protected queueOptions: Record<string, any>
  protected replyQueue: string
  protected noAssert: boolean
  protected options: Required<RmqOptions>['options']
  close(): Promise<void>
  connect(): Promise<any>
  createChannel(): Promise<void>
  createClient(): AmqpConnectionManager
  mergeDisconnectEvent<T = any>(instance: any, source$: Observable<T>): Observable<T>
  convertConnectionToPromise()
  setupChannel(channel: any, resolve: Function)
  consumeChannel(channel: any)
  registerErrorListener(client: any): void
  registerDisconnectListener(client: any): void
  on<EventKey extends keyof RmqEvents = keyof RmqEvents, EventCallback extends RmqEvents[EventKey] = RmqEvents[EventKey]>(event: EventKey, callback: EventCallback)
  unwrap<T>(): T
  handleMessage(packet: unknown, options: Record<string, unknown> | ((packet: WritePacket<any>) => any), callback?: (packet: WritePacket<any>) => any): Promise<void>
  protected publish(message: ReadPacket<any>, callback: (packet: WritePacket<any>) => any): () => void
  protected dispatchEvent(packet: ReadPacket<any>): Promise<any>
  protected initializeSerializer(options: { urls?: string[] | RmqUrl[]; queue?: string; prefetchCount?: number; isGlobalPrefetchCount?: boolean; queueOptions?: AmqplibQueueOptions; socketOptions?: AmqpConnectionManagerSocketOptions; ... 13 more ...; maxConnectionAttempts?: number; })
  protected mergeHeaders(requestHeaders?: Record<string, string>): Record<string, string> | undefined
  protected parseMessageContent(content: Buffer)

  // inherited from nest/packages/microservices/ClientProxy
  protected routingMap: Map<string, Function>
  protected serializer: ProducerSerializer
  protected deserializer: ProducerDeserializer
  protected _status$: ReplaySubject<Status>
  status: Observable<Status>
  abstract connect(): Promise<any>
  abstract close(): any
  on<EventKey extends keyof EventsMap = keyof EventsMap, EventCallback extends EventsMap[EventKey] = EventsMap[EventKey]>(event: EventKey, callback: EventCallback)
  abstract unwrap<T>(): T
  send<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>
  emit<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>
  protected abstract publish(packet: ReadPacket<any>, callback: (packet: WritePacket<any>) => void): () => void
  protected abstract dispatchEvent<T = any>(packet: ReadPacket<any>): Promise<T>
  protected createObserver<T>(observer: Observer<T>): (packet: WritePacket) => void
  protected serializeError(err: any): any
  protected serializeResponse(response: any): any
  protected assignPacketId(packet: ReadPacket<any>): ReadPacket & PacketId
  protected connect$(instance: any, errorEvent: string = 'error', connectEvent: string = 'connect'): Observable<any>
  protected getOptionsProp<Options extends ClientOptions['options'], Attribute extends keyof Options, DefaultValue extends Options[Attribute] = Options[Attribute]>(obj: Options, prop: Attribute, defaultValue: DefaultValue = undefined as DefaultValue)
  protected normalizePattern(pattern: MsPattern): string
  protected initializeSerializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
  protected initializeDeserializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
}

Constructor


constructor(options: { urls?: string[] | RmqUrl[]; queue?: string; prefetchCount?: number; isGlobalPrefetchCount?: boolean; queueOptions?: AmqplibQueueOptions; socketOptions?: AmqpConnectionManagerSocketOptions; ... 13 more ...; maxConnectionAttempts?: number; })

Parameters

Option Type Description
options object

Properties

Property Description
protected logger: Logger Read-only.
protected connection$: ReplaySubject<any>
protected connectionPromise: Promise<void>
protected client: AmqpConnectionManager | null
protected channel: ChannelWrapper | null
protected pendingEventListeners: Array<{ event: keyof RmqEvents; callback: RmqEvents[keyof RmqEvents]; }>
protected isInitialConnect: true
protected responseEmitter: EventEmitter
protected queue: string
protected queueOptions: Record<string, any>
protected replyQueue: string
protected noAssert: boolean
protected options: Required<RmqOptions>['options'] Read-only. Declared in constructor.

Methods

close()


close(): Promise<void>

Parameters

There are no parameters.

Returns

Promise<void>

connect()


connect(): Promise<any>

Parameters

There are no parameters.

Returns

Promise<any>

createChannel()


createChannel(): Promise<void>

Parameters

There are no parameters.

Returns

Promise<void>

createClient()


createClient(): AmqpConnectionManager

Parameters

There are no parameters.

Returns

AmqpConnectionManager

mergeDisconnectEvent()


mergeDisconnectEvent<T = any>(instance: any, source$: Observable<T>): Observable<T>

Parameters

Option Type Description
instance any
source$ Observable

Returns

Observable<T>

convertConnectionToPromise()


convertConnectionToPromise()

Parameters

There are no parameters.

setupChannel()


setupChannel(channel: any, resolve: Function)

Parameters

Option Type Description
channel any
resolve Function

consumeChannel()


consumeChannel(channel: any)

Parameters

Option Type Description
channel any

registerErrorListener()


registerErrorListener(client: any): void

Parameters

Option Type Description
client any

Returns

void

registerDisconnectListener()


registerDisconnectListener(client: any): void

Parameters

Option Type Description
client any

Returns

void

on()


on<EventKey extends keyof RmqEvents = keyof RmqEvents, EventCallback extends RmqEvents[EventKey] = RmqEvents[EventKey]>(event: EventKey, callback: EventCallback)

Parameters

Option Type Description
event EventKey
callback EventCallback

unwrap()


unwrap<T>(): T

Parameters

There are no parameters.

Returns

T

handleMessage()


handleMessage(packet: unknown, callback: (packet: WritePacket<any>) => any): Promise<void>

Parameters

Option Type Description
packet unknown
callback (packet: WritePacket) => any

Returns

Promise<void>


handleMessage(packet: unknown, options: Record<string, unknown>, callback: (packet: WritePacket<any>) => any): Promise<void>

Parameters

Option Type Description
packet unknown
options Record
callback (packet: WritePacket) => any

Returns

Promise<void>

publish()


protected publish(message: ReadPacket<any>, callback: (packet: WritePacket<any>) => any): () => void

Parameters

Option Type Description
message ReadPacket
callback (packet: WritePacket) => any

Returns

() => void

dispatchEvent()


protected dispatchEvent(packet: ReadPacket<any>): Promise<any>

Parameters

Option Type Description
packet ReadPacket

Returns

Promise<any>

initializeSerializer()


protected initializeSerializer(options: { urls?: string[] | RmqUrl[]; queue?: string; prefetchCount?: number; isGlobalPrefetchCount?: boolean; queueOptions?: AmqplibQueueOptions; socketOptions?: AmqpConnectionManagerSocketOptions; ... 13 more ...; maxConnectionAttempts?: number; })

Parameters

Option Type Description
options object

mergeHeaders()


protected mergeHeaders(requestHeaders?: Record<string, string>): Record<string, string> | undefined

Parameters

Option Type Description
requestHeaders Record

Optional. Default is undefined.

Returns

Record<string, string> | undefined

parseMessageContent()


protected parseMessageContent(content: Buffer)

Parameters

Option Type Description
content Buffer