class ClientKafka extends ClientProxy implements ClientKafkaProxy {
constructor(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
protected logger: Logger
protected client: Kafka | null
protected parser: KafkaParser | null
protected initialized: Promise<void> | null
protected responsePatterns: string[]
protected consumerAssignments: {...}
protected brokers: string[] | BrokersFunction
protected clientId: string
protected groupId: string
protected producerOnlyMode: boolean
protected _consumer: Consumer | null
protected _producer: Producer | null
consumer: Consumer
producer: Producer
protected options: Required<KafkaOptions>['options']
subscribeToResponseOf(pattern: unknown): void
close(): Promise<void>
connect(): Promise<Producer>
bindTopics(): Promise<void>
createClient<T = any>(): T
createResponseCallback(): (payload: EachMessagePayload) => any
getConsumerAssignments()
emitBatch<TResult = any, TInput = any>(pattern: any, data: { messages: TInput[]; }): Observable<TResult>
commitOffsets(topicPartitions: TopicPartitionOffsetAndMetadata[]): Promise<void>
unwrap<T>(): T
on<EventKey extends string | number | symbol = string | number | symbol, EventCallback = any>(event: EventKey, callback: EventCallback)
protected registerConsumerEventListeners()
protected registerProducerEventListeners()
protected dispatchBatchEvent<TInput = any>(packets: ReadPacket<{ messages: TInput[]; }>): Promise<any>
protected dispatchEvent(packet: OutgoingEvent): Promise<any>
protected getReplyTopicPartition(topic: string): string
protected publish(partialPacket: ReadPacket<any>, callback: (packet: WritePacket<any>) => any): () => void
protected getResponsePatternName(pattern: string): string
protected setConsumerAssignments(data: ConsumerGroupJoinEvent): void
protected initializeSerializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
protected initializeDeserializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
protected routingMap: Map<string, Function>
protected serializer: ProducerSerializer
protected deserializer: ProducerDeserializer
protected _status$: ReplaySubject<Status>
status: Observable<Status>
abstract connect(): Promise<any>
abstract close(): any
on<EventKey extends keyof EventsMap = keyof EventsMap, EventCallback extends EventsMap[EventKey] = EventsMap[EventKey]>(event: EventKey, callback: EventCallback)
abstract unwrap<T>(): T
send<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>
emit<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>
protected abstract publish(packet: ReadPacket<any>, callback: (packet: WritePacket<any>) => void): () => void
protected abstract dispatchEvent<T = any>(packet: ReadPacket<any>): Promise<T>
protected createObserver<T>(observer: Observer<T>): (packet: WritePacket) => void
protected serializeError(err: any): any
protected serializeResponse(response: any): any
protected assignPacketId(packet: ReadPacket<any>): ReadPacket & PacketId
protected connect$(instance: any, errorEvent: string = 'error', connectEvent: string = 'connect'): Observable<any>
protected getOptionsProp<Options extends ClientOptions['options'], Attribute extends keyof Options, DefaultValue extends Options[Attribute] = Options[Attribute]>(obj: Options, prop: Attribute, defaultValue: DefaultValue = undefined as DefaultValue)
protected normalizePattern(pattern: MsPattern): string
protected initializeSerializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
protected initializeDeserializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
}
Constructor
constructor(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
Parameters
Option |
Type |
Description |
options
|
object |
|
|
Properties
Property |
Description |
protected logger: Logger
|
|
protected client: Kafka | null
|
|
protected parser: KafkaParser | null
|
|
protected initialized: Promise<void> | null
|
|
protected responsePatterns: string[]
|
|
protected consumerAssignments: {
[key: string]: number;
}
|
|
protected brokers: string[] | BrokersFunction
|
|
protected clientId: string
|
|
protected groupId: string
|
|
protected producerOnlyMode: boolean
|
|
protected _consumer: Consumer | null
|
|
protected _producer: Producer | null
|
|
consumer: Consumer
|
Read-only.
|
producer: Producer
|
Read-only.
|
protected options: Required<KafkaOptions>['options']
|
Read-only.
Declared in constructor.
|
Methods
subscribeToResponseOf()
|
subscribeToResponseOf(pattern: unknown): void
Parameters
Option |
Type |
Description |
pattern
|
unknown |
|
Returns
void
|
close()
|
close(): Promise<void>
Parameters
There are no parameters.
Returns
Promise<void>
|
connect()
|
connect(): Promise<Producer>
Parameters
There are no parameters.
Returns
Promise<Producer>
|
bindTopics()
|
bindTopics(): Promise<void>
Parameters
There are no parameters.
Returns
Promise<void>
|
createClient()
|
createClient<T = any>(): T
Parameters
There are no parameters.
Returns
T
|
createResponseCallback()
|
createResponseCallback(): (payload: EachMessagePayload) => any
Parameters
There are no parameters.
Returns
(payload: EachMessagePayload) => any
|
getConsumerAssignments()
|
getConsumerAssignments()
Parameters
There are no parameters.
|
emitBatch()
|
emitBatch<TResult = any, TInput = any>(pattern: any, data: { messages: TInput[]; }): Observable<TResult>
Parameters
Option |
Type |
Description |
pattern
|
any |
|
data
|
object |
|
Returns
Observable<TResult>
|
commitOffsets()
|
commitOffsets(topicPartitions: TopicPartitionOffsetAndMetadata[]): Promise<void>
Parameters
Option |
Type |
Description |
topicPartitions
|
TopicPartitionOffsetAndMetadata[] |
|
Returns
Promise<void>
|
unwrap()
|
unwrap<T>(): T
Parameters
There are no parameters.
Returns
T
|
on()
|
on<EventKey extends string | number | symbol = string | number | symbol, EventCallback = any>(event: EventKey, callback: EventCallback)
Parameters
Option |
Type |
Description |
event
|
EventKey |
|
callback
|
EventCallback |
|
|
registerConsumerEventListeners()
|
protected registerConsumerEventListeners()
Parameters
There are no parameters.
|
registerProducerEventListeners()
|
protected registerProducerEventListeners()
Parameters
There are no parameters.
|
dispatchBatchEvent()
|
protected dispatchBatchEvent<TInput = any>(packets: ReadPacket<{ messages: TInput[]; }>): Promise<any>
Parameters
Option |
Type |
Description |
packets
|
ReadPacket<{ messages: TInput[]; }> |
|
Returns
Promise<any>
|
dispatchEvent()
|
protected dispatchEvent(packet: OutgoingEvent): Promise<any>
Parameters
Returns
Promise<any>
|
getReplyTopicPartition()
|
protected getReplyTopicPartition(topic: string): string
Parameters
Option |
Type |
Description |
topic
|
string |
|
Returns
string
|
publish()
|
protected publish(partialPacket: ReadPacket<any>, callback: (packet: WritePacket<any>) => any): () => void
Parameters
Returns
() => void
|
getResponsePatternName()
|
protected getResponsePatternName(pattern: string): string
Parameters
Option |
Type |
Description |
pattern
|
string |
|
Returns
string
|
setConsumerAssignments()
|
protected setConsumerAssignments(data: ConsumerGroupJoinEvent): void
Parameters
Option |
Type |
Description |
data
|
ConsumerGroupJoinEvent |
|
Returns
void
|
initializeSerializer()
|
protected initializeSerializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
Parameters
Option |
Type |
Description |
options
|
object |
|
|
initializeDeserializer()
|
protected initializeDeserializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
Parameters
Option |
Type |
Description |
options
|
object |
|
|