From 434aa8343fdcbb4d5002f934979913c099489bee Mon Sep 17 00:00:00 2001 From: altaf-creator Date: Sun, 16 Nov 2025 19:08:29 +0800 Subject: sdk, del --- .../@grpc/grpc-js/src/retrying-call.ts | 821 --------------------- 1 file changed, 821 deletions(-) delete mode 100644 frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts (limited to 'frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts') diff --git a/frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts b/frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts deleted file mode 100644 index e6e1cbb..0000000 --- a/frontend-old/node_modules/@grpc/grpc-js/src/retrying-call.ts +++ /dev/null @@ -1,821 +0,0 @@ -/* - * Copyright 2022 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { CallCredentials } from './call-credentials'; -import { LogVerbosity, Status } from './constants'; -import { Deadline } from './deadline'; -import { Metadata } from './metadata'; -import { CallConfig } from './resolver'; -import * as logging from './logging'; -import { - Call, - InterceptingListener, - MessageContext, - StatusObject, - WriteCallback, - WriteObject, -} from './call-interface'; -import { - LoadBalancingCall, - StatusObjectWithProgress, -} from './load-balancing-call'; -import { InternalChannel } from './internal-channel'; - -const TRACER_NAME = 'retrying_call'; - -export class RetryThrottler { - private tokens: number; - constructor( - private readonly maxTokens: number, - private readonly tokenRatio: number, - previousRetryThrottler?: RetryThrottler - ) { - if (previousRetryThrottler) { - /* When carrying over tokens from a previous config, rescale them to the - * new max value */ - this.tokens = - previousRetryThrottler.tokens * - (maxTokens / previousRetryThrottler.maxTokens); - } else { - this.tokens = maxTokens; - } - } - - addCallSucceeded() { - this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens); - } - - addCallFailed() { - this.tokens = Math.min(this.tokens - 1, 0); - } - - canRetryCall() { - return this.tokens > this.maxTokens / 2; - } -} - -export class MessageBufferTracker { - private totalAllocated = 0; - private allocatedPerCall: Map = new Map(); - - constructor(private totalLimit: number, private limitPerCall: number) {} - - allocate(size: number, callId: number): boolean { - const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; - if ( - this.limitPerCall - currentPerCall < size || - this.totalLimit - this.totalAllocated < size - ) { - return false; - } - this.allocatedPerCall.set(callId, currentPerCall + size); - this.totalAllocated += size; - return true; - } - - free(size: number, callId: number) { - if (this.totalAllocated < size) { - throw new Error( - `Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}` - ); - } - this.totalAllocated -= size; - const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; - if (currentPerCall < size) { - throw new Error( - `Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}` - ); - } - this.allocatedPerCall.set(callId, currentPerCall - size); - } - - freeAll(callId: number) { - const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; - if (this.totalAllocated < currentPerCall) { - throw new Error( - `Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}` - ); - } - this.totalAllocated -= currentPerCall; - this.allocatedPerCall.delete(callId); - } -} - -type UnderlyingCallState = 'ACTIVE' | 'COMPLETED'; - -interface UnderlyingCall { - state: UnderlyingCallState; - call: LoadBalancingCall; - nextMessageToSend: number; -} - -/** - * A retrying call can be in one of these states: - * RETRY: Retries are configured and new attempts may be sent - * HEDGING: Hedging is configured and new attempts may be sent - * TRANSPARENT_ONLY: Neither retries nor hedging are configured, and - * transparent retry attempts may still be sent - * COMMITTED: One attempt is committed, and no new attempts will be - * sent - */ -type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED'; - -/** - * The different types of objects that can be stored in the write buffer, with - * the following meanings: - * MESSAGE: This is a message to be sent. - * HALF_CLOSE: When this entry is reached, the calls should send a half-close. - * FREED: This slot previously contained a message that has been sent on all - * child calls and is no longer needed. - */ -type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED'; - -/** - * Entry in the buffer of messages to send to the remote end. - */ -interface WriteBufferEntry { - entryType: WriteBufferEntryType; - /** - * Message to send. - * Only populated if entryType is MESSAGE. - */ - message?: WriteObject; - /** - * Callback to call after sending the message. - * Only populated if entryType is MESSAGE and the call is in the COMMITTED - * state. - */ - callback?: WriteCallback; - /** - * Indicates whether the message is allocated in the buffer tracker. Ignored - * if entryType is not MESSAGE. Should be the return value of - * bufferTracker.allocate. - */ - allocated: boolean; -} - -const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts'; - -export class RetryingCall implements Call { - private state: RetryingCallState; - private listener: InterceptingListener | null = null; - private initialMetadata: Metadata | null = null; - private underlyingCalls: UnderlyingCall[] = []; - private writeBuffer: WriteBufferEntry[] = []; - /** - * The offset of message indices in the writeBuffer. For example, if - * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15 - * is in writeBuffer[5]. - */ - private writeBufferOffset = 0; - /** - * Tracks whether a read has been started, so that we know whether to start - * reads on new child calls. This only matters for the first read, because - * once a message comes in the child call becomes committed and there will - * be no new child calls. - */ - private readStarted = false; - private transparentRetryUsed = false; - /** - * Number of attempts so far - */ - private attempts = 0; - private hedgingTimer: NodeJS.Timeout | null = null; - private committedCallIndex: number | null = null; - private initialRetryBackoffSec = 0; - private nextRetryBackoffSec = 0; - constructor( - private readonly channel: InternalChannel, - private readonly callConfig: CallConfig, - private readonly methodName: string, - private readonly host: string, - private readonly credentials: CallCredentials, - private readonly deadline: Deadline, - private readonly callNumber: number, - private readonly bufferTracker: MessageBufferTracker, - private readonly retryThrottler?: RetryThrottler - ) { - if (callConfig.methodConfig.retryPolicy) { - this.state = 'RETRY'; - const retryPolicy = callConfig.methodConfig.retryPolicy; - this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number( - retryPolicy.initialBackoff.substring( - 0, - retryPolicy.initialBackoff.length - 1 - ) - ); - } else if (callConfig.methodConfig.hedgingPolicy) { - this.state = 'HEDGING'; - } else { - this.state = 'TRANSPARENT_ONLY'; - } - } - getCallNumber(): number { - return this.callNumber; - } - - private trace(text: string): void { - logging.trace( - LogVerbosity.DEBUG, - TRACER_NAME, - '[' + this.callNumber + '] ' + text - ); - } - - private reportStatus(statusObject: StatusObject) { - this.trace( - 'ended with status: code=' + - statusObject.code + - ' details="' + - statusObject.details + - '"' - ); - this.bufferTracker.freeAll(this.callNumber); - this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length; - this.writeBuffer = []; - process.nextTick(() => { - // Explicitly construct status object to remove progress field - this.listener?.onReceiveStatus({ - code: statusObject.code, - details: statusObject.details, - metadata: statusObject.metadata, - }); - }); - } - - cancelWithStatus(status: Status, details: string): void { - this.trace( - 'cancelWithStatus code: ' + status + ' details: "' + details + '"' - ); - this.reportStatus({ code: status, details, metadata: new Metadata() }); - for (const { call } of this.underlyingCalls) { - call.cancelWithStatus(status, details); - } - } - getPeer(): string { - if (this.committedCallIndex !== null) { - return this.underlyingCalls[this.committedCallIndex].call.getPeer(); - } else { - return 'unknown'; - } - } - - private getBufferEntry(messageIndex: number): WriteBufferEntry { - return ( - this.writeBuffer[messageIndex - this.writeBufferOffset] ?? { - entryType: 'FREED', - allocated: false, - } - ); - } - - private getNextBufferIndex() { - return this.writeBufferOffset + this.writeBuffer.length; - } - - private clearSentMessages() { - if (this.state !== 'COMMITTED') { - return; - } - const earliestNeededMessageIndex = - this.underlyingCalls[this.committedCallIndex!].nextMessageToSend; - for ( - let messageIndex = this.writeBufferOffset; - messageIndex < earliestNeededMessageIndex; - messageIndex++ - ) { - const bufferEntry = this.getBufferEntry(messageIndex); - if (bufferEntry.allocated) { - this.bufferTracker.free( - bufferEntry.message!.message.length, - this.callNumber - ); - } - } - this.writeBuffer = this.writeBuffer.slice( - earliestNeededMessageIndex - this.writeBufferOffset - ); - this.writeBufferOffset = earliestNeededMessageIndex; - } - - private commitCall(index: number) { - if (this.state === 'COMMITTED') { - return; - } - if (this.underlyingCalls[index].state === 'COMPLETED') { - return; - } - this.trace( - 'Committing call [' + - this.underlyingCalls[index].call.getCallNumber() + - '] at index ' + - index - ); - this.state = 'COMMITTED'; - this.committedCallIndex = index; - for (let i = 0; i < this.underlyingCalls.length; i++) { - if (i === index) { - continue; - } - if (this.underlyingCalls[i].state === 'COMPLETED') { - continue; - } - this.underlyingCalls[i].state = 'COMPLETED'; - this.underlyingCalls[i].call.cancelWithStatus( - Status.CANCELLED, - 'Discarded in favor of other hedged attempt' - ); - } - this.clearSentMessages(); - } - - private commitCallWithMostMessages() { - if (this.state === 'COMMITTED') { - return; - } - let mostMessages = -1; - let callWithMostMessages = -1; - for (const [index, childCall] of this.underlyingCalls.entries()) { - if ( - childCall.state === 'ACTIVE' && - childCall.nextMessageToSend > mostMessages - ) { - mostMessages = childCall.nextMessageToSend; - callWithMostMessages = index; - } - } - if (callWithMostMessages === -1) { - /* There are no active calls, disable retries to force the next call that - * is started to be committed. */ - this.state = 'TRANSPARENT_ONLY'; - } else { - this.commitCall(callWithMostMessages); - } - } - - private isStatusCodeInList(list: (Status | string)[], code: Status) { - return list.some( - value => - value === code || - value.toString().toLowerCase() === Status[code].toLowerCase() - ); - } - - private getNextRetryBackoffMs() { - const retryPolicy = this.callConfig?.methodConfig.retryPolicy; - if (!retryPolicy) { - return 0; - } - const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000; - const maxBackoffSec = Number( - retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1) - ); - this.nextRetryBackoffSec = Math.min( - this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, - maxBackoffSec - ); - return nextBackoffMs; - } - - private maybeRetryCall( - pushback: number | null, - callback: (retried: boolean) => void - ) { - if (this.state !== 'RETRY') { - callback(false); - return; - } - const retryPolicy = this.callConfig!.methodConfig.retryPolicy!; - if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) { - callback(false); - return; - } - let retryDelayMs: number; - if (pushback === null) { - retryDelayMs = this.getNextRetryBackoffMs(); - } else if (pushback < 0) { - this.state = 'TRANSPARENT_ONLY'; - callback(false); - return; - } else { - retryDelayMs = pushback; - this.nextRetryBackoffSec = this.initialRetryBackoffSec; - } - setTimeout(() => { - if (this.state !== 'RETRY') { - callback(false); - return; - } - if (this.retryThrottler?.canRetryCall() ?? true) { - callback(true); - this.attempts += 1; - this.startNewAttempt(); - } - }, retryDelayMs); - } - - private countActiveCalls(): number { - let count = 0; - for (const call of this.underlyingCalls) { - if (call?.state === 'ACTIVE') { - count += 1; - } - } - return count; - } - - private handleProcessedStatus( - status: StatusObject, - callIndex: number, - pushback: number | null - ) { - switch (this.state) { - case 'COMMITTED': - case 'TRANSPARENT_ONLY': - this.commitCall(callIndex); - this.reportStatus(status); - break; - case 'HEDGING': - if ( - this.isStatusCodeInList( - this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ?? - [], - status.code - ) - ) { - this.retryThrottler?.addCallFailed(); - let delayMs: number; - if (pushback === null) { - delayMs = 0; - } else if (pushback < 0) { - this.state = 'TRANSPARENT_ONLY'; - this.commitCall(callIndex); - this.reportStatus(status); - return; - } else { - delayMs = pushback; - } - setTimeout(() => { - this.maybeStartHedgingAttempt(); - // If after trying to start a call there are no active calls, this was the last one - if (this.countActiveCalls() === 0) { - this.commitCall(callIndex); - this.reportStatus(status); - } - }, delayMs); - } else { - this.commitCall(callIndex); - this.reportStatus(status); - } - break; - case 'RETRY': - if ( - this.isStatusCodeInList( - this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes, - status.code - ) - ) { - this.retryThrottler?.addCallFailed(); - this.maybeRetryCall(pushback, retried => { - if (!retried) { - this.commitCall(callIndex); - this.reportStatus(status); - } - }); - } else { - this.commitCall(callIndex); - this.reportStatus(status); - } - break; - } - } - - private getPushback(metadata: Metadata): number | null { - const mdValue = metadata.get('grpc-retry-pushback-ms'); - if (mdValue.length === 0) { - return null; - } - try { - return parseInt(mdValue[0] as string); - } catch (e) { - return -1; - } - } - - private handleChildStatus( - status: StatusObjectWithProgress, - callIndex: number - ) { - if (this.underlyingCalls[callIndex].state === 'COMPLETED') { - return; - } - this.trace( - 'state=' + - this.state + - ' handling status with progress ' + - status.progress + - ' from child [' + - this.underlyingCalls[callIndex].call.getCallNumber() + - '] in state ' + - this.underlyingCalls[callIndex].state - ); - this.underlyingCalls[callIndex].state = 'COMPLETED'; - if (status.code === Status.OK) { - this.retryThrottler?.addCallSucceeded(); - this.commitCall(callIndex); - this.reportStatus(status); - return; - } - if (this.state === 'COMMITTED') { - this.reportStatus(status); - return; - } - const pushback = this.getPushback(status.metadata); - switch (status.progress) { - case 'NOT_STARTED': - // RPC never leaves the client, always safe to retry - this.startNewAttempt(); - break; - case 'REFUSED': - // RPC reaches the server library, but not the server application logic - if (this.transparentRetryUsed) { - this.handleProcessedStatus(status, callIndex, pushback); - } else { - this.transparentRetryUsed = true; - this.startNewAttempt(); - } - break; - case 'DROP': - this.commitCall(callIndex); - this.reportStatus(status); - break; - case 'PROCESSED': - this.handleProcessedStatus(status, callIndex, pushback); - break; - } - } - - private maybeStartHedgingAttempt() { - if (this.state !== 'HEDGING') { - return; - } - if (!this.callConfig.methodConfig.hedgingPolicy) { - return; - } - const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy; - if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) { - return; - } - this.attempts += 1; - this.startNewAttempt(); - this.maybeStartHedgingTimer(); - } - - private maybeStartHedgingTimer() { - if (this.hedgingTimer) { - clearTimeout(this.hedgingTimer); - } - if (this.state !== 'HEDGING') { - return; - } - if (!this.callConfig.methodConfig.hedgingPolicy) { - return; - } - const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy; - if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) { - return; - } - const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s'; - const hedgingDelaySec = Number( - hedgingDelayString.substring(0, hedgingDelayString.length - 1) - ); - this.hedgingTimer = setTimeout(() => { - this.maybeStartHedgingAttempt(); - }, hedgingDelaySec * 1000); - this.hedgingTimer.unref?.(); - } - - private startNewAttempt() { - const child = this.channel.createLoadBalancingCall( - this.callConfig, - this.methodName, - this.host, - this.credentials, - this.deadline - ); - this.trace( - 'Created child call [' + - child.getCallNumber() + - '] for attempt ' + - this.attempts - ); - const index = this.underlyingCalls.length; - this.underlyingCalls.push({ - state: 'ACTIVE', - call: child, - nextMessageToSend: 0, - }); - const previousAttempts = this.attempts - 1; - const initialMetadata = this.initialMetadata!.clone(); - if (previousAttempts > 0) { - initialMetadata.set( - PREVIONS_RPC_ATTEMPTS_METADATA_KEY, - `${previousAttempts}` - ); - } - let receivedMetadata = false; - child.start(initialMetadata, { - onReceiveMetadata: metadata => { - this.trace( - 'Received metadata from child [' + child.getCallNumber() + ']' - ); - this.commitCall(index); - receivedMetadata = true; - if (previousAttempts > 0) { - metadata.set( - PREVIONS_RPC_ATTEMPTS_METADATA_KEY, - `${previousAttempts}` - ); - } - if (this.underlyingCalls[index].state === 'ACTIVE') { - this.listener!.onReceiveMetadata(metadata); - } - }, - onReceiveMessage: message => { - this.trace( - 'Received message from child [' + child.getCallNumber() + ']' - ); - this.commitCall(index); - if (this.underlyingCalls[index].state === 'ACTIVE') { - this.listener!.onReceiveMessage(message); - } - }, - onReceiveStatus: status => { - this.trace( - 'Received status from child [' + child.getCallNumber() + ']' - ); - if (!receivedMetadata && previousAttempts > 0) { - status.metadata.set( - PREVIONS_RPC_ATTEMPTS_METADATA_KEY, - `${previousAttempts}` - ); - } - this.handleChildStatus(status, index); - }, - }); - this.sendNextChildMessage(index); - if (this.readStarted) { - child.startRead(); - } - } - - start(metadata: Metadata, listener: InterceptingListener): void { - this.trace('start called'); - this.listener = listener; - this.initialMetadata = metadata; - this.attempts += 1; - this.startNewAttempt(); - this.maybeStartHedgingTimer(); - } - - private handleChildWriteCompleted(childIndex: number) { - const childCall = this.underlyingCalls[childIndex]; - const messageIndex = childCall.nextMessageToSend; - this.getBufferEntry(messageIndex).callback?.(); - this.clearSentMessages(); - childCall.nextMessageToSend += 1; - this.sendNextChildMessage(childIndex); - } - - private sendNextChildMessage(childIndex: number) { - const childCall = this.underlyingCalls[childIndex]; - if (childCall.state === 'COMPLETED') { - return; - } - if (this.getBufferEntry(childCall.nextMessageToSend)) { - const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend); - switch (bufferEntry.entryType) { - case 'MESSAGE': - childCall.call.sendMessageWithContext( - { - callback: error => { - // Ignore error - this.handleChildWriteCompleted(childIndex); - }, - }, - bufferEntry.message!.message - ); - break; - case 'HALF_CLOSE': - childCall.nextMessageToSend += 1; - childCall.call.halfClose(); - break; - case 'FREED': - // Should not be possible - break; - } - } - } - - sendMessageWithContext(context: MessageContext, message: Buffer): void { - this.trace('write() called with message of length ' + message.length); - const writeObj: WriteObject = { - message, - flags: context.flags, - }; - const messageIndex = this.getNextBufferIndex(); - const bufferEntry: WriteBufferEntry = { - entryType: 'MESSAGE', - message: writeObj, - allocated: this.bufferTracker.allocate(message.length, this.callNumber), - }; - this.writeBuffer.push(bufferEntry); - if (bufferEntry.allocated) { - context.callback?.(); - for (const [callIndex, call] of this.underlyingCalls.entries()) { - if ( - call.state === 'ACTIVE' && - call.nextMessageToSend === messageIndex - ) { - call.call.sendMessageWithContext( - { - callback: error => { - // Ignore error - this.handleChildWriteCompleted(callIndex); - }, - }, - message - ); - } - } - } else { - this.commitCallWithMostMessages(); - // commitCallWithMostMessages can fail if we are between ping attempts - if (this.committedCallIndex === null) { - return; - } - const call = this.underlyingCalls[this.committedCallIndex]; - bufferEntry.callback = context.callback; - if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) { - call.call.sendMessageWithContext( - { - callback: error => { - // Ignore error - this.handleChildWriteCompleted(this.committedCallIndex!); - }, - }, - message - ); - } - } - } - startRead(): void { - this.trace('startRead called'); - this.readStarted = true; - for (const underlyingCall of this.underlyingCalls) { - if (underlyingCall?.state === 'ACTIVE') { - underlyingCall.call.startRead(); - } - } - } - halfClose(): void { - this.trace('halfClose called'); - const halfCloseIndex = this.getNextBufferIndex(); - this.writeBuffer.push({ - entryType: 'HALF_CLOSE', - allocated: false, - }); - for (const call of this.underlyingCalls) { - if ( - call?.state === 'ACTIVE' && - call.nextMessageToSend === halfCloseIndex - ) { - call.nextMessageToSend += 1; - call.call.halfClose(); - } - } - } - setCredentials(newCredentials: CallCredentials): void { - throw new Error('Method not implemented.'); - } - getMethod(): string { - return this.methodName; - } - getHost(): string { - return this.host; - } -} -- cgit v1.2.3