diff options
Diffstat (limited to 'frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts')
| -rw-r--r-- | frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts | 351 |
1 files changed, 351 insertions, 0 deletions
diff --git a/frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts b/frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts new file mode 100644 index 0000000..2721e96 --- /dev/null +++ b/frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts @@ -0,0 +1,351 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { CallCredentials } from './call-credentials'; +import { + Call, + InterceptingListener, + MessageContext, + StatusObject, +} from './call-interface'; +import { SubchannelCall } from './subchannel-call'; +import { ConnectivityState } from './connectivity-state'; +import { LogVerbosity, Status } from './constants'; +import { Deadline, getDeadlineTimeoutString } from './deadline'; +import { InternalChannel } from './internal-channel'; +import { Metadata } from './metadata'; +import { PickResultType } from './picker'; +import { CallConfig } from './resolver'; +import { splitHostPort } from './uri-parser'; +import * as logging from './logging'; +import { restrictControlPlaneStatusCode } from './control-plane-status'; +import * as http2 from 'http2'; + +const TRACER_NAME = 'load_balancing_call'; + +export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED'; + +export interface StatusObjectWithProgress extends StatusObject { + progress: RpcProgress; +} + +export interface LoadBalancingCallInterceptingListener + extends InterceptingListener { + onReceiveStatus(status: StatusObjectWithProgress): void; +} + +export class LoadBalancingCall implements Call { + private child: SubchannelCall | null = null; + private readPending = false; + private pendingMessage: { context: MessageContext; message: Buffer } | null = + null; + private pendingHalfClose = false; + private ended = false; + private serviceUrl: string; + private metadata: Metadata | null = null; + private listener: InterceptingListener | null = null; + private onCallEnded: ((statusCode: Status) => void) | null = null; + constructor( + private readonly channel: InternalChannel, + private readonly callConfig: CallConfig, + private readonly methodName: string, + private readonly host: string, + private readonly credentials: CallCredentials, + private readonly deadline: Deadline, + private readonly callNumber: number + ) { + const splitPath: string[] = this.methodName.split('/'); + let serviceName = ''; + /* The standard path format is "/{serviceName}/{methodName}", so if we split + * by '/', the first item should be empty and the second should be the + * service name */ + if (splitPath.length >= 2) { + serviceName = splitPath[1]; + } + const hostname = splitHostPort(this.host)?.host ?? 'localhost'; + /* Currently, call credentials are only allowed on HTTPS connections, so we + * can assume that the scheme is "https" */ + this.serviceUrl = `https://${hostname}/${serviceName}`; + } + + private trace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + TRACER_NAME, + '[' + this.callNumber + '] ' + text + ); + } + + private outputStatus(status: StatusObject, progress: RpcProgress) { + if (!this.ended) { + this.ended = true; + this.trace( + 'ended with status: code=' + + status.code + + ' details="' + + status.details + + '"' + ); + const finalStatus = { ...status, progress }; + this.listener?.onReceiveStatus(finalStatus); + this.onCallEnded?.(finalStatus.code); + } + } + + doPick() { + if (this.ended) { + return; + } + if (!this.metadata) { + throw new Error('doPick called before start'); + } + this.trace('Pick called'); + const pickResult = this.channel.doPick( + this.metadata, + this.callConfig.pickInformation + ); + const subchannelString = pickResult.subchannel + ? '(' + + pickResult.subchannel.getChannelzRef().id + + ') ' + + pickResult.subchannel.getAddress() + : '' + pickResult.subchannel; + this.trace( + 'Pick result: ' + + PickResultType[pickResult.pickResultType] + + ' subchannel: ' + + subchannelString + + ' status: ' + + pickResult.status?.code + + ' ' + + pickResult.status?.details + ); + switch (pickResult.pickResultType) { + case PickResultType.COMPLETE: + this.credentials + .generateMetadata({ service_url: this.serviceUrl }) + .then( + credsMetadata => { + /* If this call was cancelled (e.g. by the deadline) before + * metadata generation finished, we shouldn't do anything with + * it. */ + if (this.ended) { + this.trace('Credentials metadata generation finished after call ended'); + return; + } + const finalMetadata = this.metadata!.clone(); + finalMetadata.merge(credsMetadata); + if (finalMetadata.get('authorization').length > 1) { + this.outputStatus( + { + code: Status.INTERNAL, + details: + '"authorization" metadata cannot have multiple values', + metadata: new Metadata(), + }, + 'PROCESSED' + ); + } + if ( + pickResult.subchannel!.getConnectivityState() !== + ConnectivityState.READY + ) { + this.trace( + 'Picked subchannel ' + + subchannelString + + ' has state ' + + ConnectivityState[ + pickResult.subchannel!.getConnectivityState() + ] + + ' after getting credentials metadata. Retrying pick' + ); + this.doPick(); + return; + } + + if (this.deadline !== Infinity) { + finalMetadata.set( + 'grpc-timeout', + getDeadlineTimeoutString(this.deadline) + ); + } + try { + this.child = pickResult + .subchannel!.getRealSubchannel() + .createCall(finalMetadata, this.host, this.methodName, { + onReceiveMetadata: metadata => { + this.trace('Received metadata'); + this.listener!.onReceiveMetadata(metadata); + }, + onReceiveMessage: message => { + this.trace('Received message'); + this.listener!.onReceiveMessage(message); + }, + onReceiveStatus: status => { + this.trace('Received status'); + if ( + status.rstCode === + http2.constants.NGHTTP2_REFUSED_STREAM + ) { + this.outputStatus(status, 'REFUSED'); + } else { + this.outputStatus(status, 'PROCESSED'); + } + }, + }); + } catch (error) { + this.trace( + 'Failed to start call on picked subchannel ' + + subchannelString + + ' with error ' + + (error as Error).message + ); + this.outputStatus( + { + code: Status.INTERNAL, + details: + 'Failed to start HTTP/2 stream with error ' + + (error as Error).message, + metadata: new Metadata(), + }, + 'NOT_STARTED' + ); + return; + } + this.callConfig.onCommitted?.(); + pickResult.onCallStarted?.(); + this.onCallEnded = pickResult.onCallEnded; + this.trace( + 'Created child call [' + this.child.getCallNumber() + ']' + ); + if (this.readPending) { + this.child.startRead(); + } + if (this.pendingMessage) { + this.child.sendMessageWithContext( + this.pendingMessage.context, + this.pendingMessage.message + ); + } + if (this.pendingHalfClose) { + this.child.halfClose(); + } + }, + (error: Error & { code: number }) => { + // We assume the error code isn't 0 (Status.OK) + const { code, details } = restrictControlPlaneStatusCode( + typeof error.code === 'number' ? error.code : Status.UNKNOWN, + `Getting metadata from plugin failed with error: ${error.message}` + ); + this.outputStatus( + { + code: code, + details: details, + metadata: new Metadata(), + }, + 'PROCESSED' + ); + } + ); + break; + case PickResultType.DROP: + const { code, details } = restrictControlPlaneStatusCode( + pickResult.status!.code, + pickResult.status!.details + ); + setImmediate(() => { + this.outputStatus( + { code, details, metadata: pickResult.status!.metadata }, + 'DROP' + ); + }); + break; + case PickResultType.TRANSIENT_FAILURE: + if (this.metadata.getOptions().waitForReady) { + this.channel.queueCallForPick(this); + } else { + const { code, details } = restrictControlPlaneStatusCode( + pickResult.status!.code, + pickResult.status!.details + ); + setImmediate(() => { + this.outputStatus( + { code, details, metadata: pickResult.status!.metadata }, + 'PROCESSED' + ); + }); + } + break; + case PickResultType.QUEUE: + this.channel.queueCallForPick(this); + } + } + + cancelWithStatus(status: Status, details: string): void { + this.trace( + 'cancelWithStatus code: ' + status + ' details: "' + details + '"' + ); + this.child?.cancelWithStatus(status, details); + this.outputStatus( + { code: status, details: details, metadata: new Metadata() }, + 'PROCESSED' + ); + } + getPeer(): string { + return this.child?.getPeer() ?? this.channel.getTarget(); + } + start( + metadata: Metadata, + listener: LoadBalancingCallInterceptingListener + ): void { + this.trace('start called'); + this.listener = listener; + this.metadata = metadata; + this.doPick(); + } + sendMessageWithContext(context: MessageContext, message: Buffer): void { + this.trace('write() called with message of length ' + message.length); + if (this.child) { + this.child.sendMessageWithContext(context, message); + } else { + this.pendingMessage = { context, message }; + } + } + startRead(): void { + this.trace('startRead called'); + if (this.child) { + this.child.startRead(); + } else { + this.readPending = true; + } + } + halfClose(): void { + this.trace('halfClose called'); + if (this.child) { + this.child.halfClose(); + } else { + this.pendingHalfClose = true; + } + } + setCredentials(credentials: CallCredentials): void { + throw new Error('Method not implemented.'); + } + + getCallNumber(): number { + return this.callNumber; + } +} |
