From 8eff962cab608341a6f2fedc640a0e32d96f26e2 Mon Sep 17 00:00:00 2001 From: altaf-creator Date: Sun, 9 Nov 2025 11:15:19 +0800 Subject: pain --- .../@grpc/grpc-js/src/internal-channel.ts | 830 +++++++++++++++++++++ 1 file changed, 830 insertions(+) create mode 100644 frontend-old/node_modules/@grpc/grpc-js/src/internal-channel.ts (limited to 'frontend-old/node_modules/@grpc/grpc-js/src/internal-channel.ts') diff --git a/frontend-old/node_modules/@grpc/grpc-js/src/internal-channel.ts b/frontend-old/node_modules/@grpc/grpc-js/src/internal-channel.ts new file mode 100644 index 0000000..5c8c91c --- /dev/null +++ b/frontend-old/node_modules/@grpc/grpc-js/src/internal-channel.ts @@ -0,0 +1,830 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { ChannelCredentials } from './channel-credentials'; +import { ChannelOptions } from './channel-options'; +import { ResolvingLoadBalancer } from './resolving-load-balancer'; +import { SubchannelPool, getSubchannelPool } from './subchannel-pool'; +import { ChannelControlHelper } from './load-balancer'; +import { UnavailablePicker, Picker, QueuePicker } from './picker'; +import { Metadata } from './metadata'; +import { Status, LogVerbosity, Propagate } from './constants'; +import { FilterStackFactory } from './filter-stack'; +import { CompressionFilterFactory } from './compression-filter'; +import { + CallConfig, + ConfigSelector, + getDefaultAuthority, + mapUriDefaultScheme, +} from './resolver'; +import { trace } from './logging'; +import { SubchannelAddress } from './subchannel-address'; +import { mapProxyName } from './http_proxy'; +import { GrpcUri, parseUri, uriToString } from './uri-parser'; +import { ServerSurfaceCall } from './server-call'; + +import { ConnectivityState } from './connectivity-state'; +import { + ChannelInfo, + ChannelRef, + ChannelzCallTracker, + ChannelzChildrenTracker, + ChannelzTrace, + registerChannelzChannel, + SubchannelRef, + unregisterChannelzRef, +} from './channelz'; +import { LoadBalancingCall } from './load-balancing-call'; +import { CallCredentials } from './call-credentials'; +import { Call, CallStreamOptions, StatusObject } from './call-interface'; +import { Deadline, deadlineToString } from './deadline'; +import { ResolvingCall } from './resolving-call'; +import { getNextCallNumber } from './call-number'; +import { restrictControlPlaneStatusCode } from './control-plane-status'; +import { + MessageBufferTracker, + RetryingCall, + RetryThrottler, +} from './retrying-call'; +import { + BaseSubchannelWrapper, + ConnectivityStateListener, + SubchannelInterface, +} from './subchannel-interface'; + +/** + * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args + */ +const MAX_TIMEOUT_TIME = 2147483647; + +const MIN_IDLE_TIMEOUT_MS = 1000; + +// 30 minutes +const DEFAULT_IDLE_TIMEOUT_MS = 30 * 60 * 1000; + +interface ConnectivityStateWatcher { + currentState: ConnectivityState; + timer: NodeJS.Timeout | null; + callback: (error?: Error) => void; +} + +interface NoneConfigResult { + type: 'NONE'; +} + +interface SuccessConfigResult { + type: 'SUCCESS'; + config: CallConfig; +} + +interface ErrorConfigResult { + type: 'ERROR'; + error: StatusObject; +} + +type GetConfigResult = + | NoneConfigResult + | SuccessConfigResult + | ErrorConfigResult; + +const RETRY_THROTTLER_MAP: Map = new Map(); + +const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1 << 24; // 16 MB +const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1 << 20; // 1 MB + +class ChannelSubchannelWrapper + extends BaseSubchannelWrapper + implements SubchannelInterface +{ + private refCount = 0; + private subchannelStateListener: ConnectivityStateListener; + constructor( + childSubchannel: SubchannelInterface, + private channel: InternalChannel + ) { + super(childSubchannel); + this.subchannelStateListener = ( + subchannel, + previousState, + newState, + keepaliveTime + ) => { + channel.throttleKeepalive(keepaliveTime); + }; + childSubchannel.addConnectivityStateListener(this.subchannelStateListener); + } + + ref(): void { + this.child.ref(); + this.refCount += 1; + } + + unref(): void { + this.child.unref(); + this.refCount -= 1; + if (this.refCount <= 0) { + this.child.removeConnectivityStateListener(this.subchannelStateListener); + this.channel.removeWrappedSubchannel(this); + } + } +} + +export class InternalChannel { + private readonly resolvingLoadBalancer: ResolvingLoadBalancer; + private readonly subchannelPool: SubchannelPool; + private connectivityState: ConnectivityState = ConnectivityState.IDLE; + private currentPicker: Picker = new UnavailablePicker(); + /** + * Calls queued up to get a call config. Should only be populated before the + * first time the resolver returns a result, which includes the ConfigSelector. + */ + private configSelectionQueue: ResolvingCall[] = []; + private pickQueue: LoadBalancingCall[] = []; + private connectivityStateWatchers: ConnectivityStateWatcher[] = []; + private readonly defaultAuthority: string; + private readonly filterStackFactory: FilterStackFactory; + private readonly target: GrpcUri; + /** + * This timer does not do anything on its own. Its purpose is to hold the + * event loop open while there are any pending calls for the channel that + * have not yet been assigned to specific subchannels. In other words, + * the invariant is that callRefTimer is reffed if and only if pickQueue + * is non-empty. + */ + private readonly callRefTimer: NodeJS.Timeout; + private configSelector: ConfigSelector | null = null; + /** + * This is the error from the name resolver if it failed most recently. It + * is only used to end calls that start while there is no config selector + * and the name resolver is in backoff, so it should be nulled if + * configSelector becomes set or the channel state becomes anything other + * than TRANSIENT_FAILURE. + */ + private currentResolutionError: StatusObject | null = null; + private readonly retryBufferTracker: MessageBufferTracker; + private keepaliveTime: number; + private readonly wrappedSubchannels: Set = + new Set(); + + private callCount = 0; + private idleTimer: NodeJS.Timeout | null = null; + private readonly idleTimeoutMs: number; + private lastActivityTimestamp: Date; + + // Channelz info + private readonly channelzEnabled: boolean = true; + private readonly originalTarget: string; + private readonly channelzRef: ChannelRef; + private readonly channelzTrace: ChannelzTrace; + private readonly callTracker = new ChannelzCallTracker(); + private readonly childrenTracker = new ChannelzChildrenTracker(); + + constructor( + target: string, + private readonly credentials: ChannelCredentials, + private readonly options: ChannelOptions + ) { + if (typeof target !== 'string') { + throw new TypeError('Channel target must be a string'); + } + if (!(credentials instanceof ChannelCredentials)) { + throw new TypeError( + 'Channel credentials must be a ChannelCredentials object' + ); + } + if (options) { + if (typeof options !== 'object') { + throw new TypeError('Channel options must be an object'); + } + } + this.originalTarget = target; + const originalTargetUri = parseUri(target); + if (originalTargetUri === null) { + throw new Error(`Could not parse target name "${target}"`); + } + /* This ensures that the target has a scheme that is registered with the + * resolver */ + const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri); + if (defaultSchemeMapResult === null) { + throw new Error( + `Could not find a default scheme for target name "${target}"` + ); + } + + this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME); + this.callRefTimer.unref?.(); + + if (this.options['grpc.enable_channelz'] === 0) { + this.channelzEnabled = false; + } + + this.channelzTrace = new ChannelzTrace(); + this.channelzRef = registerChannelzChannel( + target, + () => this.getChannelzInfo(), + this.channelzEnabled + ); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Channel created'); + } + + if (this.options['grpc.default_authority']) { + this.defaultAuthority = this.options['grpc.default_authority'] as string; + } else { + this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult); + } + const proxyMapResult = mapProxyName(defaultSchemeMapResult, options); + this.target = proxyMapResult.target; + this.options = Object.assign({}, this.options, proxyMapResult.extraOptions); + + /* The global boolean parameter to getSubchannelPool has the inverse meaning to what + * the grpc.use_local_subchannel_pool channel option means. */ + this.subchannelPool = getSubchannelPool( + (options['grpc.use_local_subchannel_pool'] ?? 0) === 0 + ); + this.retryBufferTracker = new MessageBufferTracker( + options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES, + options['grpc.per_rpc_retry_buffer_size'] ?? + DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES + ); + this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1; + this.idleTimeoutMs = Math.max( + options['grpc.client_idle_timeout_ms'] ?? DEFAULT_IDLE_TIMEOUT_MS, + MIN_IDLE_TIMEOUT_MS + ); + const channelControlHelper: ChannelControlHelper = { + createSubchannel: ( + subchannelAddress: SubchannelAddress, + subchannelArgs: ChannelOptions + ) => { + const subchannel = this.subchannelPool.getOrCreateSubchannel( + this.target, + subchannelAddress, + Object.assign({}, this.options, subchannelArgs), + this.credentials + ); + subchannel.throttleKeepalive(this.keepaliveTime); + if (this.channelzEnabled) { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Created subchannel or used existing subchannel', + subchannel.getChannelzRef() + ); + } + const wrappedSubchannel = new ChannelSubchannelWrapper( + subchannel, + this + ); + this.wrappedSubchannels.add(wrappedSubchannel); + return wrappedSubchannel; + }, + updateState: (connectivityState: ConnectivityState, picker: Picker) => { + this.currentPicker = picker; + const queueCopy = this.pickQueue.slice(); + this.pickQueue = []; + if (queueCopy.length > 0) { + this.callRefTimerUnref(); + } + for (const call of queueCopy) { + call.doPick(); + } + this.updateState(connectivityState); + }, + requestReresolution: () => { + // This should never be called. + throw new Error( + 'Resolving load balancer should never call requestReresolution' + ); + }, + addChannelzChild: (child: ChannelRef | SubchannelRef) => { + if (this.channelzEnabled) { + this.childrenTracker.refChild(child); + } + }, + removeChannelzChild: (child: ChannelRef | SubchannelRef) => { + if (this.channelzEnabled) { + this.childrenTracker.unrefChild(child); + } + }, + }; + this.resolvingLoadBalancer = new ResolvingLoadBalancer( + this.target, + channelControlHelper, + options, + (serviceConfig, configSelector) => { + if (serviceConfig.retryThrottling) { + RETRY_THROTTLER_MAP.set( + this.getTarget(), + new RetryThrottler( + serviceConfig.retryThrottling.maxTokens, + serviceConfig.retryThrottling.tokenRatio, + RETRY_THROTTLER_MAP.get(this.getTarget()) + ) + ); + } else { + RETRY_THROTTLER_MAP.delete(this.getTarget()); + } + if (this.channelzEnabled) { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Address resolution succeeded' + ); + } + this.configSelector = configSelector; + this.currentResolutionError = null; + /* We process the queue asynchronously to ensure that the corresponding + * load balancer update has completed. */ + process.nextTick(() => { + const localQueue = this.configSelectionQueue; + this.configSelectionQueue = []; + if (localQueue.length > 0) { + this.callRefTimerUnref(); + } + for (const call of localQueue) { + call.getConfig(); + } + }); + }, + status => { + if (this.channelzEnabled) { + this.channelzTrace.addTrace( + 'CT_WARNING', + 'Address resolution failed with code ' + + status.code + + ' and details "' + + status.details + + '"' + ); + } + if (this.configSelectionQueue.length > 0) { + this.trace( + 'Name resolution failed with calls queued for config selection' + ); + } + if (this.configSelector === null) { + this.currentResolutionError = { + ...restrictControlPlaneStatusCode(status.code, status.details), + metadata: status.metadata, + }; + } + const localQueue = this.configSelectionQueue; + this.configSelectionQueue = []; + if (localQueue.length > 0) { + this.callRefTimerUnref(); + } + for (const call of localQueue) { + call.reportResolverError(status); + } + } + ); + this.filterStackFactory = new FilterStackFactory([ + new CompressionFilterFactory(this, this.options), + ]); + this.trace( + 'Channel constructed with options ' + + JSON.stringify(options, undefined, 2) + ); + const error = new Error(); + trace( + LogVerbosity.DEBUG, + 'channel_stacktrace', + '(' + + this.channelzRef.id + + ') ' + + 'Channel constructed \n' + + error.stack?.substring(error.stack.indexOf('\n') + 1) + ); + this.lastActivityTimestamp = new Date(); + } + + private getChannelzInfo(): ChannelInfo { + return { + target: this.originalTarget, + state: this.connectivityState, + trace: this.channelzTrace, + callTracker: this.callTracker, + children: this.childrenTracker.getChildLists(), + }; + } + + private trace(text: string, verbosityOverride?: LogVerbosity) { + trace( + verbosityOverride ?? LogVerbosity.DEBUG, + 'channel', + '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text + ); + } + + private callRefTimerRef() { + // If the hasRef function does not exist, always run the code + if (!this.callRefTimer.hasRef?.()) { + this.trace( + 'callRefTimer.ref | configSelectionQueue.length=' + + this.configSelectionQueue.length + + ' pickQueue.length=' + + this.pickQueue.length + ); + this.callRefTimer.ref?.(); + } + } + + private callRefTimerUnref() { + // If the hasRef function does not exist, always run the code + if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) { + this.trace( + 'callRefTimer.unref | configSelectionQueue.length=' + + this.configSelectionQueue.length + + ' pickQueue.length=' + + this.pickQueue.length + ); + this.callRefTimer.unref?.(); + } + } + + private removeConnectivityStateWatcher( + watcherObject: ConnectivityStateWatcher + ) { + const watcherIndex = this.connectivityStateWatchers.findIndex( + value => value === watcherObject + ); + if (watcherIndex >= 0) { + this.connectivityStateWatchers.splice(watcherIndex, 1); + } + } + + private updateState(newState: ConnectivityState): void { + trace( + LogVerbosity.DEBUG, + 'connectivity_state', + '(' + + this.channelzRef.id + + ') ' + + uriToString(this.target) + + ' ' + + ConnectivityState[this.connectivityState] + + ' -> ' + + ConnectivityState[newState] + ); + if (this.channelzEnabled) { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connectivity state change to ' + ConnectivityState[newState] + ); + } + this.connectivityState = newState; + const watchersCopy = this.connectivityStateWatchers.slice(); + for (const watcherObject of watchersCopy) { + if (newState !== watcherObject.currentState) { + if (watcherObject.timer) { + clearTimeout(watcherObject.timer); + } + this.removeConnectivityStateWatcher(watcherObject); + watcherObject.callback(); + } + } + if (newState !== ConnectivityState.TRANSIENT_FAILURE) { + this.currentResolutionError = null; + } + } + + throttleKeepalive(newKeepaliveTime: number) { + if (newKeepaliveTime > this.keepaliveTime) { + this.keepaliveTime = newKeepaliveTime; + for (const wrappedSubchannel of this.wrappedSubchannels) { + wrappedSubchannel.throttleKeepalive(newKeepaliveTime); + } + } + } + + removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) { + this.wrappedSubchannels.delete(wrappedSubchannel); + } + + doPick(metadata: Metadata, extraPickInfo: { [key: string]: string }) { + return this.currentPicker.pick({ + metadata: metadata, + extraPickInfo: extraPickInfo, + }); + } + + queueCallForPick(call: LoadBalancingCall) { + this.pickQueue.push(call); + this.callRefTimerRef(); + } + + getConfig(method: string, metadata: Metadata): GetConfigResult { + this.resolvingLoadBalancer.exitIdle(); + if (this.configSelector) { + return { + type: 'SUCCESS', + config: this.configSelector(method, metadata), + }; + } else { + if (this.currentResolutionError) { + return { + type: 'ERROR', + error: this.currentResolutionError, + }; + } else { + return { + type: 'NONE', + }; + } + } + } + + queueCallForConfig(call: ResolvingCall) { + this.configSelectionQueue.push(call); + this.callRefTimerRef(); + } + + private enterIdle() { + this.resolvingLoadBalancer.destroy(); + this.updateState(ConnectivityState.IDLE); + this.currentPicker = new QueuePicker(this.resolvingLoadBalancer); + if (this.idleTimer) { + clearTimeout(this.idleTimer); + this.idleTimer = null; + } + } + + private startIdleTimeout(timeoutMs: number) { + this.idleTimer = setTimeout(() => { + if (this.callCount > 0) { + /* If there is currently a call, the channel will not go idle for a + * period of at least idleTimeoutMs, so check again after that time. + */ + this.startIdleTimeout(this.idleTimeoutMs); + return; + } + const now = new Date(); + const timeSinceLastActivity = now.valueOf() - this.lastActivityTimestamp.valueOf(); + if (timeSinceLastActivity >= this.idleTimeoutMs) { + this.trace( + 'Idle timer triggered after ' + + this.idleTimeoutMs + + 'ms of inactivity' + ); + this.enterIdle(); + } else { + /* Whenever the timer fires with the latest activity being too recent, + * set the timer again for the time when the time since the last + * activity is equal to the timeout. This should result in the timer + * firing no more than once every idleTimeoutMs/2 on average. */ + this.startIdleTimeout(this.idleTimeoutMs - timeSinceLastActivity); + } + }, timeoutMs); + this.idleTimer.unref?.(); + } + + private maybeStartIdleTimer() { + if (this.connectivityState !== ConnectivityState.SHUTDOWN && !this.idleTimer) { + this.startIdleTimeout(this.idleTimeoutMs); + } + } + + private onCallStart() { + if (this.channelzEnabled) { + this.callTracker.addCallStarted(); + } + this.callCount += 1; + } + + private onCallEnd(status: StatusObject) { + if (this.channelzEnabled) { + if (status.code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + } + this.callCount -= 1; + this.lastActivityTimestamp = new Date(); + this.maybeStartIdleTimer(); + } + + createLoadBalancingCall( + callConfig: CallConfig, + method: string, + host: string, + credentials: CallCredentials, + deadline: Deadline + ): LoadBalancingCall { + const callNumber = getNextCallNumber(); + this.trace( + 'createLoadBalancingCall [' + callNumber + '] method="' + method + '"' + ); + return new LoadBalancingCall( + this, + callConfig, + method, + host, + credentials, + deadline, + callNumber + ); + } + + createRetryingCall( + callConfig: CallConfig, + method: string, + host: string, + credentials: CallCredentials, + deadline: Deadline + ): RetryingCall { + const callNumber = getNextCallNumber(); + this.trace( + 'createRetryingCall [' + callNumber + '] method="' + method + '"' + ); + return new RetryingCall( + this, + callConfig, + method, + host, + credentials, + deadline, + callNumber, + this.retryBufferTracker, + RETRY_THROTTLER_MAP.get(this.getTarget()) + ); + } + + createInnerCall( + callConfig: CallConfig, + method: string, + host: string, + credentials: CallCredentials, + deadline: Deadline + ): Call { + // Create a RetryingCall if retries are enabled + if (this.options['grpc.enable_retries'] === 0) { + return this.createLoadBalancingCall( + callConfig, + method, + host, + credentials, + deadline + ); + } else { + return this.createRetryingCall( + callConfig, + method, + host, + credentials, + deadline + ); + } + } + + createResolvingCall( + method: string, + deadline: Deadline, + host: string | null | undefined, + parentCall: ServerSurfaceCall | null, + propagateFlags: number | null | undefined + ): ResolvingCall { + const callNumber = getNextCallNumber(); + this.trace( + 'createResolvingCall [' + + callNumber + + '] method="' + + method + + '", deadline=' + + deadlineToString(deadline) + ); + const finalOptions: CallStreamOptions = { + deadline: deadline, + flags: propagateFlags ?? Propagate.DEFAULTS, + host: host ?? this.defaultAuthority, + parentCall: parentCall, + }; + + const call = new ResolvingCall( + this, + method, + finalOptions, + this.filterStackFactory.clone(), + this.credentials._getCallCredentials(), + callNumber + ); + + this.onCallStart(); + call.addStatusWatcher(status => { + this.onCallEnd(status); + }); + return call; + } + + close() { + this.resolvingLoadBalancer.destroy(); + this.updateState(ConnectivityState.SHUTDOWN); + clearInterval(this.callRefTimer); + if (this.idleTimer) { + clearTimeout(this.idleTimer); + } + if (this.channelzEnabled) { + unregisterChannelzRef(this.channelzRef); + } + + this.subchannelPool.unrefUnusedSubchannels(); + } + + getTarget() { + return uriToString(this.target); + } + + getConnectivityState(tryToConnect: boolean) { + const connectivityState = this.connectivityState; + if (tryToConnect) { + this.resolvingLoadBalancer.exitIdle(); + this.lastActivityTimestamp = new Date(); + this.maybeStartIdleTimer(); + } + return connectivityState; + } + + watchConnectivityState( + currentState: ConnectivityState, + deadline: Date | number, + callback: (error?: Error) => void + ): void { + if (this.connectivityState === ConnectivityState.SHUTDOWN) { + throw new Error('Channel has been shut down'); + } + let timer = null; + if (deadline !== Infinity) { + const deadlineDate: Date = + deadline instanceof Date ? deadline : new Date(deadline); + const now = new Date(); + if (deadline === -Infinity || deadlineDate <= now) { + process.nextTick( + callback, + new Error('Deadline passed without connectivity state change') + ); + return; + } + timer = setTimeout(() => { + this.removeConnectivityStateWatcher(watcherObject); + callback( + new Error('Deadline passed without connectivity state change') + ); + }, deadlineDate.getTime() - now.getTime()); + } + const watcherObject = { + currentState, + callback, + timer, + }; + this.connectivityStateWatchers.push(watcherObject); + } + + /** + * Get the channelz reference object for this channel. The returned value is + * garbage if channelz is disabled for this channel. + * @returns + */ + getChannelzRef() { + return this.channelzRef; + } + + createCall( + method: string, + deadline: Deadline, + host: string | null | undefined, + parentCall: ServerSurfaceCall | null, + propagateFlags: number | null | undefined + ): Call { + if (typeof method !== 'string') { + throw new TypeError('Channel#createCall: method must be a string'); + } + if (!(typeof deadline === 'number' || deadline instanceof Date)) { + throw new TypeError( + 'Channel#createCall: deadline must be a number or Date' + ); + } + if (this.connectivityState === ConnectivityState.SHUTDOWN) { + throw new Error('Channel has been shut down'); + } + return this.createResolvingCall( + method, + deadline, + host, + parentCall, + propagateFlags + ); + } +} -- cgit v1.2.3