ClientKafka

  
class ClientKafka extends ClientProxy implements ClientKafkaProxy {
  constructor(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
  protected logger: Logger
  protected client: Kafka | null
  protected parser: KafkaParser | null
  protected initialized: Promise<void> | null
  protected responsePatterns: string[]
  protected consumerAssignments: {...}
  protected brokers: string[] | BrokersFunction
  protected clientId: string
  protected groupId: string
  protected producerOnlyMode: boolean
  protected _consumer: Consumer | null
  protected _producer: Producer | null
  consumer: Consumer
  producer: Producer
  protected options: Required<KafkaOptions>['options']
  subscribeToResponseOf(pattern: unknown): void
  close(): Promise<void>
  connect(): Promise<Producer>
  bindTopics(): Promise<void>
  createClient<T = any>(): T
  createResponseCallback(): (payload: EachMessagePayload) => any
  getConsumerAssignments()
  emitBatch<TResult = any, TInput = any>(pattern: any, data: { messages: TInput[]; }): Observable<TResult>
  commitOffsets(topicPartitions: TopicPartitionOffsetAndMetadata[]): Promise<void>
  unwrap<T>(): T
  on<EventKey extends string | number | symbol = string | number | symbol, EventCallback = any>(event: EventKey, callback: EventCallback)
  protected registerConsumerEventListeners()
  protected registerProducerEventListeners()
  protected dispatchBatchEvent<TInput = any>(packets: ReadPacket<{ messages: TInput[]; }>): Promise<any>
  protected dispatchEvent(packet: OutgoingEvent): Promise<any>
  protected getReplyTopicPartition(topic: string): string
  protected publish(partialPacket: ReadPacket<any>, callback: (packet: WritePacket<any>) => any): () => void
  protected getResponsePatternName(pattern: string): string
  protected setConsumerAssignments(data: ConsumerGroupJoinEvent): void
  protected initializeSerializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
  protected initializeDeserializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })

  // inherited from nest/packages/microservices/ClientProxy
  protected routingMap: Map<string, Function>
  protected serializer: ProducerSerializer
  protected deserializer: ProducerDeserializer
  protected _status$: ReplaySubject<Status>
  status: Observable<Status>
  abstract connect(): Promise<any>
  abstract close(): any
  on<EventKey extends keyof EventsMap = keyof EventsMap, EventCallback extends EventsMap[EventKey] = EventsMap[EventKey]>(event: EventKey, callback: EventCallback)
  abstract unwrap<T>(): T
  send<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>
  emit<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>
  protected abstract publish(packet: ReadPacket<any>, callback: (packet: WritePacket<any>) => void): () => void
  protected abstract dispatchEvent<T = any>(packet: ReadPacket<any>): Promise<T>
  protected createObserver<T>(observer: Observer<T>): (packet: WritePacket) => void
  protected serializeError(err: any): any
  protected serializeResponse(response: any): any
  protected assignPacketId(packet: ReadPacket<any>): ReadPacket & PacketId
  protected connect$(instance: any, errorEvent: string = 'error', connectEvent: string = 'connect'): Observable<any>
  protected getOptionsProp<Options extends ClientOptions['options'], Attribute extends keyof Options, DefaultValue extends Options[Attribute] = Options[Attribute]>(obj: Options, prop: Attribute, defaultValue: DefaultValue = undefined as DefaultValue)
  protected normalizePattern(pattern: MsPattern): string
  protected initializeSerializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
  protected initializeDeserializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
}

Constructor


constructor(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })

Parameters

Option Type Description
options object

Properties

Property Description
protected logger: Logger
protected client: Kafka | null
protected parser: KafkaParser | null
protected initialized: Promise<void> | null
protected responsePatterns: string[]
protected consumerAssignments: { [key: string]: number; }
protected brokers: string[] | BrokersFunction
protected clientId: string
protected groupId: string
protected producerOnlyMode: boolean
protected _consumer: Consumer | null
protected _producer: Producer | null
consumer: Consumer Read-only.
producer: Producer Read-only.
protected options: Required<KafkaOptions>['options'] Read-only. Declared in constructor.

Methods

subscribeToResponseOf()


subscribeToResponseOf(pattern: unknown): void

Parameters

Option Type Description
pattern unknown

Returns

void

close()


close(): Promise<void>

Parameters

There are no parameters.

Returns

Promise<void>

connect()


connect(): Promise<Producer>

Parameters

There are no parameters.

Returns

Promise<Producer>

bindTopics()


bindTopics(): Promise<void>

Parameters

There are no parameters.

Returns

Promise<void>

createClient()


createClient<T = any>(): T

Parameters

There are no parameters.

Returns

T

createResponseCallback()


createResponseCallback(): (payload: EachMessagePayload) => any

Parameters

There are no parameters.

Returns

(payload: EachMessagePayload) => any

getConsumerAssignments()


getConsumerAssignments()

Parameters

There are no parameters.

emitBatch()


emitBatch<TResult = any, TInput = any>(pattern: any, data: { messages: TInput[]; }): Observable<TResult>

Parameters

Option Type Description
pattern any
data object

Returns

Observable<TResult>

commitOffsets()


commitOffsets(topicPartitions: TopicPartitionOffsetAndMetadata[]): Promise<void>

Parameters

Option Type Description
topicPartitions TopicPartitionOffsetAndMetadata[]

Returns

Promise<void>

unwrap()


unwrap<T>(): T

Parameters

There are no parameters.

Returns

T

on()


on<EventKey extends string | number | symbol = string | number | symbol, EventCallback = any>(event: EventKey, callback: EventCallback)

Parameters

Option Type Description
event EventKey
callback EventCallback

registerConsumerEventListeners()


protected registerConsumerEventListeners()

Parameters

There are no parameters.

registerProducerEventListeners()


protected registerProducerEventListeners()

Parameters

There are no parameters.

dispatchBatchEvent()


protected dispatchBatchEvent<TInput = any>(packets: ReadPacket<{ messages: TInput[]; }>): Promise<any>

Parameters

Option Type Description
packets ReadPacket<{ messages: TInput[]; }>

Returns

Promise<any>

dispatchEvent()


protected dispatchEvent(packet: OutgoingEvent): Promise<any>

Parameters

Option Type Description
packet OutgoingEvent

Returns

Promise<any>

getReplyTopicPartition()


protected getReplyTopicPartition(topic: string): string

Parameters

Option Type Description
topic string

Returns

string

publish()


protected publish(partialPacket: ReadPacket<any>, callback: (packet: WritePacket<any>) => any): () => void

Parameters

Option Type Description
partialPacket ReadPacket
callback (packet: WritePacket) => any

Returns

() => void

getResponsePatternName()


protected getResponsePatternName(pattern: string): string

Parameters

Option Type Description
pattern string

Returns

string

setConsumerAssignments()


protected setConsumerAssignments(data: ConsumerGroupJoinEvent): void

Parameters

Option Type Description
data ConsumerGroupJoinEvent

Returns

void

initializeSerializer()


protected initializeSerializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })

Parameters

Option Type Description
options object

initializeDeserializer()


protected initializeDeserializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })

Parameters

Option Type Description
options object