diff options
Diffstat (limited to 'frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts')
| -rw-r--r-- | frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts | 821 |
1 files changed, 821 insertions, 0 deletions
diff --git a/frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts b/frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts new file mode 100644 index 0000000..e6e1cbb --- /dev/null +++ b/frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts @@ -0,0 +1,821 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { CallCredentials } from './call-credentials'; +import { LogVerbosity, Status } from './constants'; +import { Deadline } from './deadline'; +import { Metadata } from './metadata'; +import { CallConfig } from './resolver'; +import * as logging from './logging'; +import { + Call, + InterceptingListener, + MessageContext, + StatusObject, + WriteCallback, + WriteObject, +} from './call-interface'; +import { + LoadBalancingCall, + StatusObjectWithProgress, +} from './load-balancing-call'; +import { InternalChannel } from './internal-channel'; + +const TRACER_NAME = 'retrying_call'; + +export class RetryThrottler { + private tokens: number; + constructor( + private readonly maxTokens: number, + private readonly tokenRatio: number, + previousRetryThrottler?: RetryThrottler + ) { + if (previousRetryThrottler) { + /* When carrying over tokens from a previous config, rescale them to the + * new max value */ + this.tokens = + previousRetryThrottler.tokens * + (maxTokens / previousRetryThrottler.maxTokens); + } else { + this.tokens = maxTokens; + } + } + + addCallSucceeded() { + this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens); + } + + addCallFailed() { + this.tokens = Math.min(this.tokens - 1, 0); + } + + canRetryCall() { + return this.tokens > this.maxTokens / 2; + } +} + +export class MessageBufferTracker { + private totalAllocated = 0; + private allocatedPerCall: Map<number, number> = new Map<number, number>(); + + constructor(private totalLimit: number, private limitPerCall: number) {} + + allocate(size: number, callId: number): boolean { + const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; + if ( + this.limitPerCall - currentPerCall < size || + this.totalLimit - this.totalAllocated < size + ) { + return false; + } + this.allocatedPerCall.set(callId, currentPerCall + size); + this.totalAllocated += size; + return true; + } + + free(size: number, callId: number) { + if (this.totalAllocated < size) { + throw new Error( + `Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}` + ); + } + this.totalAllocated -= size; + const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; + if (currentPerCall < size) { + throw new Error( + `Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}` + ); + } + this.allocatedPerCall.set(callId, currentPerCall - size); + } + + freeAll(callId: number) { + const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; + if (this.totalAllocated < currentPerCall) { + throw new Error( + `Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}` + ); + } + this.totalAllocated -= currentPerCall; + this.allocatedPerCall.delete(callId); + } +} + +type UnderlyingCallState = 'ACTIVE' | 'COMPLETED'; + +interface UnderlyingCall { + state: UnderlyingCallState; + call: LoadBalancingCall; + nextMessageToSend: number; +} + +/** + * A retrying call can be in one of these states: + * RETRY: Retries are configured and new attempts may be sent + * HEDGING: Hedging is configured and new attempts may be sent + * TRANSPARENT_ONLY: Neither retries nor hedging are configured, and + * transparent retry attempts may still be sent + * COMMITTED: One attempt is committed, and no new attempts will be + * sent + */ +type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED'; + +/** + * The different types of objects that can be stored in the write buffer, with + * the following meanings: + * MESSAGE: This is a message to be sent. + * HALF_CLOSE: When this entry is reached, the calls should send a half-close. + * FREED: This slot previously contained a message that has been sent on all + * child calls and is no longer needed. + */ +type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED'; + +/** + * Entry in the buffer of messages to send to the remote end. + */ +interface WriteBufferEntry { + entryType: WriteBufferEntryType; + /** + * Message to send. + * Only populated if entryType is MESSAGE. + */ + message?: WriteObject; + /** + * Callback to call after sending the message. + * Only populated if entryType is MESSAGE and the call is in the COMMITTED + * state. + */ + callback?: WriteCallback; + /** + * Indicates whether the message is allocated in the buffer tracker. Ignored + * if entryType is not MESSAGE. Should be the return value of + * bufferTracker.allocate. + */ + allocated: boolean; +} + +const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts'; + +export class RetryingCall implements Call { + private state: RetryingCallState; + private listener: InterceptingListener | null = null; + private initialMetadata: Metadata | null = null; + private underlyingCalls: UnderlyingCall[] = []; + private writeBuffer: WriteBufferEntry[] = []; + /** + * The offset of message indices in the writeBuffer. For example, if + * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15 + * is in writeBuffer[5]. + */ + private writeBufferOffset = 0; + /** + * Tracks whether a read has been started, so that we know whether to start + * reads on new child calls. This only matters for the first read, because + * once a message comes in the child call becomes committed and there will + * be no new child calls. + */ + private readStarted = false; + private transparentRetryUsed = false; + /** + * Number of attempts so far + */ + private attempts = 0; + private hedgingTimer: NodeJS.Timeout | null = null; + private committedCallIndex: number | null = null; + private initialRetryBackoffSec = 0; + private nextRetryBackoffSec = 0; + constructor( + private readonly channel: InternalChannel, + private readonly callConfig: CallConfig, + private readonly methodName: string, + private readonly host: string, + private readonly credentials: CallCredentials, + private readonly deadline: Deadline, + private readonly callNumber: number, + private readonly bufferTracker: MessageBufferTracker, + private readonly retryThrottler?: RetryThrottler + ) { + if (callConfig.methodConfig.retryPolicy) { + this.state = 'RETRY'; + const retryPolicy = callConfig.methodConfig.retryPolicy; + this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number( + retryPolicy.initialBackoff.substring( + 0, + retryPolicy.initialBackoff.length - 1 + ) + ); + } else if (callConfig.methodConfig.hedgingPolicy) { + this.state = 'HEDGING'; + } else { + this.state = 'TRANSPARENT_ONLY'; + } + } + getCallNumber(): number { + return this.callNumber; + } + + private trace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + TRACER_NAME, + '[' + this.callNumber + '] ' + text + ); + } + + private reportStatus(statusObject: StatusObject) { + this.trace( + 'ended with status: code=' + + statusObject.code + + ' details="' + + statusObject.details + + '"' + ); + this.bufferTracker.freeAll(this.callNumber); + this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length; + this.writeBuffer = []; + process.nextTick(() => { + // Explicitly construct status object to remove progress field + this.listener?.onReceiveStatus({ + code: statusObject.code, + details: statusObject.details, + metadata: statusObject.metadata, + }); + }); + } + + cancelWithStatus(status: Status, details: string): void { + this.trace( + 'cancelWithStatus code: ' + status + ' details: "' + details + '"' + ); + this.reportStatus({ code: status, details, metadata: new Metadata() }); + for (const { call } of this.underlyingCalls) { + call.cancelWithStatus(status, details); + } + } + getPeer(): string { + if (this.committedCallIndex !== null) { + return this.underlyingCalls[this.committedCallIndex].call.getPeer(); + } else { + return 'unknown'; + } + } + + private getBufferEntry(messageIndex: number): WriteBufferEntry { + return ( + this.writeBuffer[messageIndex - this.writeBufferOffset] ?? { + entryType: 'FREED', + allocated: false, + } + ); + } + + private getNextBufferIndex() { + return this.writeBufferOffset + this.writeBuffer.length; + } + + private clearSentMessages() { + if (this.state !== 'COMMITTED') { + return; + } + const earliestNeededMessageIndex = + this.underlyingCalls[this.committedCallIndex!].nextMessageToSend; + for ( + let messageIndex = this.writeBufferOffset; + messageIndex < earliestNeededMessageIndex; + messageIndex++ + ) { + const bufferEntry = this.getBufferEntry(messageIndex); + if (bufferEntry.allocated) { + this.bufferTracker.free( + bufferEntry.message!.message.length, + this.callNumber + ); + } + } + this.writeBuffer = this.writeBuffer.slice( + earliestNeededMessageIndex - this.writeBufferOffset + ); + this.writeBufferOffset = earliestNeededMessageIndex; + } + + private commitCall(index: number) { + if (this.state === 'COMMITTED') { + return; + } + if (this.underlyingCalls[index].state === 'COMPLETED') { + return; + } + this.trace( + 'Committing call [' + + this.underlyingCalls[index].call.getCallNumber() + + '] at index ' + + index + ); + this.state = 'COMMITTED'; + this.committedCallIndex = index; + for (let i = 0; i < this.underlyingCalls.length; i++) { + if (i === index) { + continue; + } + if (this.underlyingCalls[i].state === 'COMPLETED') { + continue; + } + this.underlyingCalls[i].state = 'COMPLETED'; + this.underlyingCalls[i].call.cancelWithStatus( + Status.CANCELLED, + 'Discarded in favor of other hedged attempt' + ); + } + this.clearSentMessages(); + } + + private commitCallWithMostMessages() { + if (this.state === 'COMMITTED') { + return; + } + let mostMessages = -1; + let callWithMostMessages = -1; + for (const [index, childCall] of this.underlyingCalls.entries()) { + if ( + childCall.state === 'ACTIVE' && + childCall.nextMessageToSend > mostMessages + ) { + mostMessages = childCall.nextMessageToSend; + callWithMostMessages = index; + } + } + if (callWithMostMessages === -1) { + /* There are no active calls, disable retries to force the next call that + * is started to be committed. */ + this.state = 'TRANSPARENT_ONLY'; + } else { + this.commitCall(callWithMostMessages); + } + } + + private isStatusCodeInList(list: (Status | string)[], code: Status) { + return list.some( + value => + value === code || + value.toString().toLowerCase() === Status[code].toLowerCase() + ); + } + + private getNextRetryBackoffMs() { + const retryPolicy = this.callConfig?.methodConfig.retryPolicy; + if (!retryPolicy) { + return 0; + } + const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000; + const maxBackoffSec = Number( + retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1) + ); + this.nextRetryBackoffSec = Math.min( + this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, + maxBackoffSec + ); + return nextBackoffMs; + } + + private maybeRetryCall( + pushback: number | null, + callback: (retried: boolean) => void + ) { + if (this.state !== 'RETRY') { + callback(false); + return; + } + const retryPolicy = this.callConfig!.methodConfig.retryPolicy!; + if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) { + callback(false); + return; + } + let retryDelayMs: number; + if (pushback === null) { + retryDelayMs = this.getNextRetryBackoffMs(); + } else if (pushback < 0) { + this.state = 'TRANSPARENT_ONLY'; + callback(false); + return; + } else { + retryDelayMs = pushback; + this.nextRetryBackoffSec = this.initialRetryBackoffSec; + } + setTimeout(() => { + if (this.state !== 'RETRY') { + callback(false); + return; + } + if (this.retryThrottler?.canRetryCall() ?? true) { + callback(true); + this.attempts += 1; + this.startNewAttempt(); + } + }, retryDelayMs); + } + + private countActiveCalls(): number { + let count = 0; + for (const call of this.underlyingCalls) { + if (call?.state === 'ACTIVE') { + count += 1; + } + } + return count; + } + + private handleProcessedStatus( + status: StatusObject, + callIndex: number, + pushback: number | null + ) { + switch (this.state) { + case 'COMMITTED': + case 'TRANSPARENT_ONLY': + this.commitCall(callIndex); + this.reportStatus(status); + break; + case 'HEDGING': + if ( + this.isStatusCodeInList( + this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ?? + [], + status.code + ) + ) { + this.retryThrottler?.addCallFailed(); + let delayMs: number; + if (pushback === null) { + delayMs = 0; + } else if (pushback < 0) { + this.state = 'TRANSPARENT_ONLY'; + this.commitCall(callIndex); + this.reportStatus(status); + return; + } else { + delayMs = pushback; + } + setTimeout(() => { + this.maybeStartHedgingAttempt(); + // If after trying to start a call there are no active calls, this was the last one + if (this.countActiveCalls() === 0) { + this.commitCall(callIndex); + this.reportStatus(status); + } + }, delayMs); + } else { + this.commitCall(callIndex); + this.reportStatus(status); + } + break; + case 'RETRY': + if ( + this.isStatusCodeInList( + this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes, + status.code + ) + ) { + this.retryThrottler?.addCallFailed(); + this.maybeRetryCall(pushback, retried => { + if (!retried) { + this.commitCall(callIndex); + this.reportStatus(status); + } + }); + } else { + this.commitCall(callIndex); + this.reportStatus(status); + } + break; + } + } + + private getPushback(metadata: Metadata): number | null { + const mdValue = metadata.get('grpc-retry-pushback-ms'); + if (mdValue.length === 0) { + return null; + } + try { + return parseInt(mdValue[0] as string); + } catch (e) { + return -1; + } + } + + private handleChildStatus( + status: StatusObjectWithProgress, + callIndex: number + ) { + if (this.underlyingCalls[callIndex].state === 'COMPLETED') { + return; + } + this.trace( + 'state=' + + this.state + + ' handling status with progress ' + + status.progress + + ' from child [' + + this.underlyingCalls[callIndex].call.getCallNumber() + + '] in state ' + + this.underlyingCalls[callIndex].state + ); + this.underlyingCalls[callIndex].state = 'COMPLETED'; + if (status.code === Status.OK) { + this.retryThrottler?.addCallSucceeded(); + this.commitCall(callIndex); + this.reportStatus(status); + return; + } + if (this.state === 'COMMITTED') { + this.reportStatus(status); + return; + } + const pushback = this.getPushback(status.metadata); + switch (status.progress) { + case 'NOT_STARTED': + // RPC never leaves the client, always safe to retry + this.startNewAttempt(); + break; + case 'REFUSED': + // RPC reaches the server library, but not the server application logic + if (this.transparentRetryUsed) { + this.handleProcessedStatus(status, callIndex, pushback); + } else { + this.transparentRetryUsed = true; + this.startNewAttempt(); + } + break; + case 'DROP': + this.commitCall(callIndex); + this.reportStatus(status); + break; + case 'PROCESSED': + this.handleProcessedStatus(status, callIndex, pushback); + break; + } + } + + private maybeStartHedgingAttempt() { + if (this.state !== 'HEDGING') { + return; + } + if (!this.callConfig.methodConfig.hedgingPolicy) { + return; + } + const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy; + if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) { + return; + } + this.attempts += 1; + this.startNewAttempt(); + this.maybeStartHedgingTimer(); + } + + private maybeStartHedgingTimer() { + if (this.hedgingTimer) { + clearTimeout(this.hedgingTimer); + } + if (this.state !== 'HEDGING') { + return; + } + if (!this.callConfig.methodConfig.hedgingPolicy) { + return; + } + const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy; + if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) { + return; + } + const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s'; + const hedgingDelaySec = Number( + hedgingDelayString.substring(0, hedgingDelayString.length - 1) + ); + this.hedgingTimer = setTimeout(() => { + this.maybeStartHedgingAttempt(); + }, hedgingDelaySec * 1000); + this.hedgingTimer.unref?.(); + } + + private startNewAttempt() { + const child = this.channel.createLoadBalancingCall( + this.callConfig, + this.methodName, + this.host, + this.credentials, + this.deadline + ); + this.trace( + 'Created child call [' + + child.getCallNumber() + + '] for attempt ' + + this.attempts + ); + const index = this.underlyingCalls.length; + this.underlyingCalls.push({ + state: 'ACTIVE', + call: child, + nextMessageToSend: 0, + }); + const previousAttempts = this.attempts - 1; + const initialMetadata = this.initialMetadata!.clone(); + if (previousAttempts > 0) { + initialMetadata.set( + PREVIONS_RPC_ATTEMPTS_METADATA_KEY, + `${previousAttempts}` + ); + } + let receivedMetadata = false; + child.start(initialMetadata, { + onReceiveMetadata: metadata => { + this.trace( + 'Received metadata from child [' + child.getCallNumber() + ']' + ); + this.commitCall(index); + receivedMetadata = true; + if (previousAttempts > 0) { + metadata.set( + PREVIONS_RPC_ATTEMPTS_METADATA_KEY, + `${previousAttempts}` + ); + } + if (this.underlyingCalls[index].state === 'ACTIVE') { + this.listener!.onReceiveMetadata(metadata); + } + }, + onReceiveMessage: message => { + this.trace( + 'Received message from child [' + child.getCallNumber() + ']' + ); + this.commitCall(index); + if (this.underlyingCalls[index].state === 'ACTIVE') { + this.listener!.onReceiveMessage(message); + } + }, + onReceiveStatus: status => { + this.trace( + 'Received status from child [' + child.getCallNumber() + ']' + ); + if (!receivedMetadata && previousAttempts > 0) { + status.metadata.set( + PREVIONS_RPC_ATTEMPTS_METADATA_KEY, + `${previousAttempts}` + ); + } + this.handleChildStatus(status, index); + }, + }); + this.sendNextChildMessage(index); + if (this.readStarted) { + child.startRead(); + } + } + + start(metadata: Metadata, listener: InterceptingListener): void { + this.trace('start called'); + this.listener = listener; + this.initialMetadata = metadata; + this.attempts += 1; + this.startNewAttempt(); + this.maybeStartHedgingTimer(); + } + + private handleChildWriteCompleted(childIndex: number) { + const childCall = this.underlyingCalls[childIndex]; + const messageIndex = childCall.nextMessageToSend; + this.getBufferEntry(messageIndex).callback?.(); + this.clearSentMessages(); + childCall.nextMessageToSend += 1; + this.sendNextChildMessage(childIndex); + } + + private sendNextChildMessage(childIndex: number) { + const childCall = this.underlyingCalls[childIndex]; + if (childCall.state === 'COMPLETED') { + return; + } + if (this.getBufferEntry(childCall.nextMessageToSend)) { + const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend); + switch (bufferEntry.entryType) { + case 'MESSAGE': + childCall.call.sendMessageWithContext( + { + callback: error => { + // Ignore error + this.handleChildWriteCompleted(childIndex); + }, + }, + bufferEntry.message!.message + ); + break; + case 'HALF_CLOSE': + childCall.nextMessageToSend += 1; + childCall.call.halfClose(); + break; + case 'FREED': + // Should not be possible + break; + } + } + } + + sendMessageWithContext(context: MessageContext, message: Buffer): void { + this.trace('write() called with message of length ' + message.length); + const writeObj: WriteObject = { + message, + flags: context.flags, + }; + const messageIndex = this.getNextBufferIndex(); + const bufferEntry: WriteBufferEntry = { + entryType: 'MESSAGE', + message: writeObj, + allocated: this.bufferTracker.allocate(message.length, this.callNumber), + }; + this.writeBuffer.push(bufferEntry); + if (bufferEntry.allocated) { + context.callback?.(); + for (const [callIndex, call] of this.underlyingCalls.entries()) { + if ( + call.state === 'ACTIVE' && + call.nextMessageToSend === messageIndex + ) { + call.call.sendMessageWithContext( + { + callback: error => { + // Ignore error + this.handleChildWriteCompleted(callIndex); + }, + }, + message + ); + } + } + } else { + this.commitCallWithMostMessages(); + // commitCallWithMostMessages can fail if we are between ping attempts + if (this.committedCallIndex === null) { + return; + } + const call = this.underlyingCalls[this.committedCallIndex]; + bufferEntry.callback = context.callback; + if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) { + call.call.sendMessageWithContext( + { + callback: error => { + // Ignore error + this.handleChildWriteCompleted(this.committedCallIndex!); + }, + }, + message + ); + } + } + } + startRead(): void { + this.trace('startRead called'); + this.readStarted = true; + for (const underlyingCall of this.underlyingCalls) { + if (underlyingCall?.state === 'ACTIVE') { + underlyingCall.call.startRead(); + } + } + } + halfClose(): void { + this.trace('halfClose called'); + const halfCloseIndex = this.getNextBufferIndex(); + this.writeBuffer.push({ + entryType: 'HALF_CLOSE', + allocated: false, + }); + for (const call of this.underlyingCalls) { + if ( + call?.state === 'ACTIVE' && + call.nextMessageToSend === halfCloseIndex + ) { + call.nextMessageToSend += 1; + call.call.halfClose(); + } + } + } + setCredentials(newCredentials: CallCredentials): void { + throw new Error('Method not implemented.'); + } + getMethod(): string { + return this.methodName; + } + getHost(): string { + return this.host; + } +} |
