summaryrefslogtreecommitdiff
path: root/frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts
diff options
context:
space:
mode:
Diffstat (limited to 'frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts')
-rw-r--r--frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts821
1 files changed, 821 insertions, 0 deletions
diff --git a/frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts b/frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts
new file mode 100644
index 0000000..e6e1cbb
--- /dev/null
+++ b/frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts
@@ -0,0 +1,821 @@
+/*
+ * Copyright 2022 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import { CallCredentials } from './call-credentials';
+import { LogVerbosity, Status } from './constants';
+import { Deadline } from './deadline';
+import { Metadata } from './metadata';
+import { CallConfig } from './resolver';
+import * as logging from './logging';
+import {
+ Call,
+ InterceptingListener,
+ MessageContext,
+ StatusObject,
+ WriteCallback,
+ WriteObject,
+} from './call-interface';
+import {
+ LoadBalancingCall,
+ StatusObjectWithProgress,
+} from './load-balancing-call';
+import { InternalChannel } from './internal-channel';
+
+const TRACER_NAME = 'retrying_call';
+
+export class RetryThrottler {
+ private tokens: number;
+ constructor(
+ private readonly maxTokens: number,
+ private readonly tokenRatio: number,
+ previousRetryThrottler?: RetryThrottler
+ ) {
+ if (previousRetryThrottler) {
+ /* When carrying over tokens from a previous config, rescale them to the
+ * new max value */
+ this.tokens =
+ previousRetryThrottler.tokens *
+ (maxTokens / previousRetryThrottler.maxTokens);
+ } else {
+ this.tokens = maxTokens;
+ }
+ }
+
+ addCallSucceeded() {
+ this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
+ }
+
+ addCallFailed() {
+ this.tokens = Math.min(this.tokens - 1, 0);
+ }
+
+ canRetryCall() {
+ return this.tokens > this.maxTokens / 2;
+ }
+}
+
+export class MessageBufferTracker {
+ private totalAllocated = 0;
+ private allocatedPerCall: Map<number, number> = new Map<number, number>();
+
+ constructor(private totalLimit: number, private limitPerCall: number) {}
+
+ allocate(size: number, callId: number): boolean {
+ const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
+ if (
+ this.limitPerCall - currentPerCall < size ||
+ this.totalLimit - this.totalAllocated < size
+ ) {
+ return false;
+ }
+ this.allocatedPerCall.set(callId, currentPerCall + size);
+ this.totalAllocated += size;
+ return true;
+ }
+
+ free(size: number, callId: number) {
+ if (this.totalAllocated < size) {
+ throw new Error(
+ `Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`
+ );
+ }
+ this.totalAllocated -= size;
+ const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
+ if (currentPerCall < size) {
+ throw new Error(
+ `Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`
+ );
+ }
+ this.allocatedPerCall.set(callId, currentPerCall - size);
+ }
+
+ freeAll(callId: number) {
+ const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
+ if (this.totalAllocated < currentPerCall) {
+ throw new Error(
+ `Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`
+ );
+ }
+ this.totalAllocated -= currentPerCall;
+ this.allocatedPerCall.delete(callId);
+ }
+}
+
+type UnderlyingCallState = 'ACTIVE' | 'COMPLETED';
+
+interface UnderlyingCall {
+ state: UnderlyingCallState;
+ call: LoadBalancingCall;
+ nextMessageToSend: number;
+}
+
+/**
+ * A retrying call can be in one of these states:
+ * RETRY: Retries are configured and new attempts may be sent
+ * HEDGING: Hedging is configured and new attempts may be sent
+ * TRANSPARENT_ONLY: Neither retries nor hedging are configured, and
+ * transparent retry attempts may still be sent
+ * COMMITTED: One attempt is committed, and no new attempts will be
+ * sent
+ */
+type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED';
+
+/**
+ * The different types of objects that can be stored in the write buffer, with
+ * the following meanings:
+ * MESSAGE: This is a message to be sent.
+ * HALF_CLOSE: When this entry is reached, the calls should send a half-close.
+ * FREED: This slot previously contained a message that has been sent on all
+ * child calls and is no longer needed.
+ */
+type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED';
+
+/**
+ * Entry in the buffer of messages to send to the remote end.
+ */
+interface WriteBufferEntry {
+ entryType: WriteBufferEntryType;
+ /**
+ * Message to send.
+ * Only populated if entryType is MESSAGE.
+ */
+ message?: WriteObject;
+ /**
+ * Callback to call after sending the message.
+ * Only populated if entryType is MESSAGE and the call is in the COMMITTED
+ * state.
+ */
+ callback?: WriteCallback;
+ /**
+ * Indicates whether the message is allocated in the buffer tracker. Ignored
+ * if entryType is not MESSAGE. Should be the return value of
+ * bufferTracker.allocate.
+ */
+ allocated: boolean;
+}
+
+const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
+
+export class RetryingCall implements Call {
+ private state: RetryingCallState;
+ private listener: InterceptingListener | null = null;
+ private initialMetadata: Metadata | null = null;
+ private underlyingCalls: UnderlyingCall[] = [];
+ private writeBuffer: WriteBufferEntry[] = [];
+ /**
+ * The offset of message indices in the writeBuffer. For example, if
+ * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
+ * is in writeBuffer[5].
+ */
+ private writeBufferOffset = 0;
+ /**
+ * Tracks whether a read has been started, so that we know whether to start
+ * reads on new child calls. This only matters for the first read, because
+ * once a message comes in the child call becomes committed and there will
+ * be no new child calls.
+ */
+ private readStarted = false;
+ private transparentRetryUsed = false;
+ /**
+ * Number of attempts so far
+ */
+ private attempts = 0;
+ private hedgingTimer: NodeJS.Timeout | null = null;
+ private committedCallIndex: number | null = null;
+ private initialRetryBackoffSec = 0;
+ private nextRetryBackoffSec = 0;
+ constructor(
+ private readonly channel: InternalChannel,
+ private readonly callConfig: CallConfig,
+ private readonly methodName: string,
+ private readonly host: string,
+ private readonly credentials: CallCredentials,
+ private readonly deadline: Deadline,
+ private readonly callNumber: number,
+ private readonly bufferTracker: MessageBufferTracker,
+ private readonly retryThrottler?: RetryThrottler
+ ) {
+ if (callConfig.methodConfig.retryPolicy) {
+ this.state = 'RETRY';
+ const retryPolicy = callConfig.methodConfig.retryPolicy;
+ this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(
+ retryPolicy.initialBackoff.substring(
+ 0,
+ retryPolicy.initialBackoff.length - 1
+ )
+ );
+ } else if (callConfig.methodConfig.hedgingPolicy) {
+ this.state = 'HEDGING';
+ } else {
+ this.state = 'TRANSPARENT_ONLY';
+ }
+ }
+ getCallNumber(): number {
+ return this.callNumber;
+ }
+
+ private trace(text: string): void {
+ logging.trace(
+ LogVerbosity.DEBUG,
+ TRACER_NAME,
+ '[' + this.callNumber + '] ' + text
+ );
+ }
+
+ private reportStatus(statusObject: StatusObject) {
+ this.trace(
+ 'ended with status: code=' +
+ statusObject.code +
+ ' details="' +
+ statusObject.details +
+ '"'
+ );
+ this.bufferTracker.freeAll(this.callNumber);
+ this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
+ this.writeBuffer = [];
+ process.nextTick(() => {
+ // Explicitly construct status object to remove progress field
+ this.listener?.onReceiveStatus({
+ code: statusObject.code,
+ details: statusObject.details,
+ metadata: statusObject.metadata,
+ });
+ });
+ }
+
+ cancelWithStatus(status: Status, details: string): void {
+ this.trace(
+ 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
+ );
+ this.reportStatus({ code: status, details, metadata: new Metadata() });
+ for (const { call } of this.underlyingCalls) {
+ call.cancelWithStatus(status, details);
+ }
+ }
+ getPeer(): string {
+ if (this.committedCallIndex !== null) {
+ return this.underlyingCalls[this.committedCallIndex].call.getPeer();
+ } else {
+ return 'unknown';
+ }
+ }
+
+ private getBufferEntry(messageIndex: number): WriteBufferEntry {
+ return (
+ this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {
+ entryType: 'FREED',
+ allocated: false,
+ }
+ );
+ }
+
+ private getNextBufferIndex() {
+ return this.writeBufferOffset + this.writeBuffer.length;
+ }
+
+ private clearSentMessages() {
+ if (this.state !== 'COMMITTED') {
+ return;
+ }
+ const earliestNeededMessageIndex =
+ this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
+ for (
+ let messageIndex = this.writeBufferOffset;
+ messageIndex < earliestNeededMessageIndex;
+ messageIndex++
+ ) {
+ const bufferEntry = this.getBufferEntry(messageIndex);
+ if (bufferEntry.allocated) {
+ this.bufferTracker.free(
+ bufferEntry.message!.message.length,
+ this.callNumber
+ );
+ }
+ }
+ this.writeBuffer = this.writeBuffer.slice(
+ earliestNeededMessageIndex - this.writeBufferOffset
+ );
+ this.writeBufferOffset = earliestNeededMessageIndex;
+ }
+
+ private commitCall(index: number) {
+ if (this.state === 'COMMITTED') {
+ return;
+ }
+ if (this.underlyingCalls[index].state === 'COMPLETED') {
+ return;
+ }
+ this.trace(
+ 'Committing call [' +
+ this.underlyingCalls[index].call.getCallNumber() +
+ '] at index ' +
+ index
+ );
+ this.state = 'COMMITTED';
+ this.committedCallIndex = index;
+ for (let i = 0; i < this.underlyingCalls.length; i++) {
+ if (i === index) {
+ continue;
+ }
+ if (this.underlyingCalls[i].state === 'COMPLETED') {
+ continue;
+ }
+ this.underlyingCalls[i].state = 'COMPLETED';
+ this.underlyingCalls[i].call.cancelWithStatus(
+ Status.CANCELLED,
+ 'Discarded in favor of other hedged attempt'
+ );
+ }
+ this.clearSentMessages();
+ }
+
+ private commitCallWithMostMessages() {
+ if (this.state === 'COMMITTED') {
+ return;
+ }
+ let mostMessages = -1;
+ let callWithMostMessages = -1;
+ for (const [index, childCall] of this.underlyingCalls.entries()) {
+ if (
+ childCall.state === 'ACTIVE' &&
+ childCall.nextMessageToSend > mostMessages
+ ) {
+ mostMessages = childCall.nextMessageToSend;
+ callWithMostMessages = index;
+ }
+ }
+ if (callWithMostMessages === -1) {
+ /* There are no active calls, disable retries to force the next call that
+ * is started to be committed. */
+ this.state = 'TRANSPARENT_ONLY';
+ } else {
+ this.commitCall(callWithMostMessages);
+ }
+ }
+
+ private isStatusCodeInList(list: (Status | string)[], code: Status) {
+ return list.some(
+ value =>
+ value === code ||
+ value.toString().toLowerCase() === Status[code].toLowerCase()
+ );
+ }
+
+ private getNextRetryBackoffMs() {
+ const retryPolicy = this.callConfig?.methodConfig.retryPolicy;
+ if (!retryPolicy) {
+ return 0;
+ }
+ const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
+ const maxBackoffSec = Number(
+ retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1)
+ );
+ this.nextRetryBackoffSec = Math.min(
+ this.nextRetryBackoffSec * retryPolicy.backoffMultiplier,
+ maxBackoffSec
+ );
+ return nextBackoffMs;
+ }
+
+ private maybeRetryCall(
+ pushback: number | null,
+ callback: (retried: boolean) => void
+ ) {
+ if (this.state !== 'RETRY') {
+ callback(false);
+ return;
+ }
+ const retryPolicy = this.callConfig!.methodConfig.retryPolicy!;
+ if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) {
+ callback(false);
+ return;
+ }
+ let retryDelayMs: number;
+ if (pushback === null) {
+ retryDelayMs = this.getNextRetryBackoffMs();
+ } else if (pushback < 0) {
+ this.state = 'TRANSPARENT_ONLY';
+ callback(false);
+ return;
+ } else {
+ retryDelayMs = pushback;
+ this.nextRetryBackoffSec = this.initialRetryBackoffSec;
+ }
+ setTimeout(() => {
+ if (this.state !== 'RETRY') {
+ callback(false);
+ return;
+ }
+ if (this.retryThrottler?.canRetryCall() ?? true) {
+ callback(true);
+ this.attempts += 1;
+ this.startNewAttempt();
+ }
+ }, retryDelayMs);
+ }
+
+ private countActiveCalls(): number {
+ let count = 0;
+ for (const call of this.underlyingCalls) {
+ if (call?.state === 'ACTIVE') {
+ count += 1;
+ }
+ }
+ return count;
+ }
+
+ private handleProcessedStatus(
+ status: StatusObject,
+ callIndex: number,
+ pushback: number | null
+ ) {
+ switch (this.state) {
+ case 'COMMITTED':
+ case 'TRANSPARENT_ONLY':
+ this.commitCall(callIndex);
+ this.reportStatus(status);
+ break;
+ case 'HEDGING':
+ if (
+ this.isStatusCodeInList(
+ this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ??
+ [],
+ status.code
+ )
+ ) {
+ this.retryThrottler?.addCallFailed();
+ let delayMs: number;
+ if (pushback === null) {
+ delayMs = 0;
+ } else if (pushback < 0) {
+ this.state = 'TRANSPARENT_ONLY';
+ this.commitCall(callIndex);
+ this.reportStatus(status);
+ return;
+ } else {
+ delayMs = pushback;
+ }
+ setTimeout(() => {
+ this.maybeStartHedgingAttempt();
+ // If after trying to start a call there are no active calls, this was the last one
+ if (this.countActiveCalls() === 0) {
+ this.commitCall(callIndex);
+ this.reportStatus(status);
+ }
+ }, delayMs);
+ } else {
+ this.commitCall(callIndex);
+ this.reportStatus(status);
+ }
+ break;
+ case 'RETRY':
+ if (
+ this.isStatusCodeInList(
+ this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes,
+ status.code
+ )
+ ) {
+ this.retryThrottler?.addCallFailed();
+ this.maybeRetryCall(pushback, retried => {
+ if (!retried) {
+ this.commitCall(callIndex);
+ this.reportStatus(status);
+ }
+ });
+ } else {
+ this.commitCall(callIndex);
+ this.reportStatus(status);
+ }
+ break;
+ }
+ }
+
+ private getPushback(metadata: Metadata): number | null {
+ const mdValue = metadata.get('grpc-retry-pushback-ms');
+ if (mdValue.length === 0) {
+ return null;
+ }
+ try {
+ return parseInt(mdValue[0] as string);
+ } catch (e) {
+ return -1;
+ }
+ }
+
+ private handleChildStatus(
+ status: StatusObjectWithProgress,
+ callIndex: number
+ ) {
+ if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
+ return;
+ }
+ this.trace(
+ 'state=' +
+ this.state +
+ ' handling status with progress ' +
+ status.progress +
+ ' from child [' +
+ this.underlyingCalls[callIndex].call.getCallNumber() +
+ '] in state ' +
+ this.underlyingCalls[callIndex].state
+ );
+ this.underlyingCalls[callIndex].state = 'COMPLETED';
+ if (status.code === Status.OK) {
+ this.retryThrottler?.addCallSucceeded();
+ this.commitCall(callIndex);
+ this.reportStatus(status);
+ return;
+ }
+ if (this.state === 'COMMITTED') {
+ this.reportStatus(status);
+ return;
+ }
+ const pushback = this.getPushback(status.metadata);
+ switch (status.progress) {
+ case 'NOT_STARTED':
+ // RPC never leaves the client, always safe to retry
+ this.startNewAttempt();
+ break;
+ case 'REFUSED':
+ // RPC reaches the server library, but not the server application logic
+ if (this.transparentRetryUsed) {
+ this.handleProcessedStatus(status, callIndex, pushback);
+ } else {
+ this.transparentRetryUsed = true;
+ this.startNewAttempt();
+ }
+ break;
+ case 'DROP':
+ this.commitCall(callIndex);
+ this.reportStatus(status);
+ break;
+ case 'PROCESSED':
+ this.handleProcessedStatus(status, callIndex, pushback);
+ break;
+ }
+ }
+
+ private maybeStartHedgingAttempt() {
+ if (this.state !== 'HEDGING') {
+ return;
+ }
+ if (!this.callConfig.methodConfig.hedgingPolicy) {
+ return;
+ }
+ const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
+ if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
+ return;
+ }
+ this.attempts += 1;
+ this.startNewAttempt();
+ this.maybeStartHedgingTimer();
+ }
+
+ private maybeStartHedgingTimer() {
+ if (this.hedgingTimer) {
+ clearTimeout(this.hedgingTimer);
+ }
+ if (this.state !== 'HEDGING') {
+ return;
+ }
+ if (!this.callConfig.methodConfig.hedgingPolicy) {
+ return;
+ }
+ const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
+ if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
+ return;
+ }
+ const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s';
+ const hedgingDelaySec = Number(
+ hedgingDelayString.substring(0, hedgingDelayString.length - 1)
+ );
+ this.hedgingTimer = setTimeout(() => {
+ this.maybeStartHedgingAttempt();
+ }, hedgingDelaySec * 1000);
+ this.hedgingTimer.unref?.();
+ }
+
+ private startNewAttempt() {
+ const child = this.channel.createLoadBalancingCall(
+ this.callConfig,
+ this.methodName,
+ this.host,
+ this.credentials,
+ this.deadline
+ );
+ this.trace(
+ 'Created child call [' +
+ child.getCallNumber() +
+ '] for attempt ' +
+ this.attempts
+ );
+ const index = this.underlyingCalls.length;
+ this.underlyingCalls.push({
+ state: 'ACTIVE',
+ call: child,
+ nextMessageToSend: 0,
+ });
+ const previousAttempts = this.attempts - 1;
+ const initialMetadata = this.initialMetadata!.clone();
+ if (previousAttempts > 0) {
+ initialMetadata.set(
+ PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
+ `${previousAttempts}`
+ );
+ }
+ let receivedMetadata = false;
+ child.start(initialMetadata, {
+ onReceiveMetadata: metadata => {
+ this.trace(
+ 'Received metadata from child [' + child.getCallNumber() + ']'
+ );
+ this.commitCall(index);
+ receivedMetadata = true;
+ if (previousAttempts > 0) {
+ metadata.set(
+ PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
+ `${previousAttempts}`
+ );
+ }
+ if (this.underlyingCalls[index].state === 'ACTIVE') {
+ this.listener!.onReceiveMetadata(metadata);
+ }
+ },
+ onReceiveMessage: message => {
+ this.trace(
+ 'Received message from child [' + child.getCallNumber() + ']'
+ );
+ this.commitCall(index);
+ if (this.underlyingCalls[index].state === 'ACTIVE') {
+ this.listener!.onReceiveMessage(message);
+ }
+ },
+ onReceiveStatus: status => {
+ this.trace(
+ 'Received status from child [' + child.getCallNumber() + ']'
+ );
+ if (!receivedMetadata && previousAttempts > 0) {
+ status.metadata.set(
+ PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
+ `${previousAttempts}`
+ );
+ }
+ this.handleChildStatus(status, index);
+ },
+ });
+ this.sendNextChildMessage(index);
+ if (this.readStarted) {
+ child.startRead();
+ }
+ }
+
+ start(metadata: Metadata, listener: InterceptingListener): void {
+ this.trace('start called');
+ this.listener = listener;
+ this.initialMetadata = metadata;
+ this.attempts += 1;
+ this.startNewAttempt();
+ this.maybeStartHedgingTimer();
+ }
+
+ private handleChildWriteCompleted(childIndex: number) {
+ const childCall = this.underlyingCalls[childIndex];
+ const messageIndex = childCall.nextMessageToSend;
+ this.getBufferEntry(messageIndex).callback?.();
+ this.clearSentMessages();
+ childCall.nextMessageToSend += 1;
+ this.sendNextChildMessage(childIndex);
+ }
+
+ private sendNextChildMessage(childIndex: number) {
+ const childCall = this.underlyingCalls[childIndex];
+ if (childCall.state === 'COMPLETED') {
+ return;
+ }
+ if (this.getBufferEntry(childCall.nextMessageToSend)) {
+ const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
+ switch (bufferEntry.entryType) {
+ case 'MESSAGE':
+ childCall.call.sendMessageWithContext(
+ {
+ callback: error => {
+ // Ignore error
+ this.handleChildWriteCompleted(childIndex);
+ },
+ },
+ bufferEntry.message!.message
+ );
+ break;
+ case 'HALF_CLOSE':
+ childCall.nextMessageToSend += 1;
+ childCall.call.halfClose();
+ break;
+ case 'FREED':
+ // Should not be possible
+ break;
+ }
+ }
+ }
+
+ sendMessageWithContext(context: MessageContext, message: Buffer): void {
+ this.trace('write() called with message of length ' + message.length);
+ const writeObj: WriteObject = {
+ message,
+ flags: context.flags,
+ };
+ const messageIndex = this.getNextBufferIndex();
+ const bufferEntry: WriteBufferEntry = {
+ entryType: 'MESSAGE',
+ message: writeObj,
+ allocated: this.bufferTracker.allocate(message.length, this.callNumber),
+ };
+ this.writeBuffer.push(bufferEntry);
+ if (bufferEntry.allocated) {
+ context.callback?.();
+ for (const [callIndex, call] of this.underlyingCalls.entries()) {
+ if (
+ call.state === 'ACTIVE' &&
+ call.nextMessageToSend === messageIndex
+ ) {
+ call.call.sendMessageWithContext(
+ {
+ callback: error => {
+ // Ignore error
+ this.handleChildWriteCompleted(callIndex);
+ },
+ },
+ message
+ );
+ }
+ }
+ } else {
+ this.commitCallWithMostMessages();
+ // commitCallWithMostMessages can fail if we are between ping attempts
+ if (this.committedCallIndex === null) {
+ return;
+ }
+ const call = this.underlyingCalls[this.committedCallIndex];
+ bufferEntry.callback = context.callback;
+ if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
+ call.call.sendMessageWithContext(
+ {
+ callback: error => {
+ // Ignore error
+ this.handleChildWriteCompleted(this.committedCallIndex!);
+ },
+ },
+ message
+ );
+ }
+ }
+ }
+ startRead(): void {
+ this.trace('startRead called');
+ this.readStarted = true;
+ for (const underlyingCall of this.underlyingCalls) {
+ if (underlyingCall?.state === 'ACTIVE') {
+ underlyingCall.call.startRead();
+ }
+ }
+ }
+ halfClose(): void {
+ this.trace('halfClose called');
+ const halfCloseIndex = this.getNextBufferIndex();
+ this.writeBuffer.push({
+ entryType: 'HALF_CLOSE',
+ allocated: false,
+ });
+ for (const call of this.underlyingCalls) {
+ if (
+ call?.state === 'ACTIVE' &&
+ call.nextMessageToSend === halfCloseIndex
+ ) {
+ call.nextMessageToSend += 1;
+ call.call.halfClose();
+ }
+ }
+ }
+ setCredentials(newCredentials: CallCredentials): void {
+ throw new Error('Method not implemented.');
+ }
+ getMethod(): string {
+ return this.methodName;
+ }
+ getHost(): string {
+ return this.host;
+ }
+}