summaryrefslogtreecommitdiff
path: root/frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts
diff options
context:
space:
mode:
Diffstat (limited to 'frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts')
-rw-r--r--frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts351
1 files changed, 351 insertions, 0 deletions
diff --git a/frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts b/frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts
new file mode 100644
index 0000000..2721e96
--- /dev/null
+++ b/frontend-old/node_modules/@grpc/grpc-js/src/load-balancing-call.ts
@@ -0,0 +1,351 @@
+/*
+ * Copyright 2022 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import { CallCredentials } from './call-credentials';
+import {
+ Call,
+ InterceptingListener,
+ MessageContext,
+ StatusObject,
+} from './call-interface';
+import { SubchannelCall } from './subchannel-call';
+import { ConnectivityState } from './connectivity-state';
+import { LogVerbosity, Status } from './constants';
+import { Deadline, getDeadlineTimeoutString } from './deadline';
+import { InternalChannel } from './internal-channel';
+import { Metadata } from './metadata';
+import { PickResultType } from './picker';
+import { CallConfig } from './resolver';
+import { splitHostPort } from './uri-parser';
+import * as logging from './logging';
+import { restrictControlPlaneStatusCode } from './control-plane-status';
+import * as http2 from 'http2';
+
+const TRACER_NAME = 'load_balancing_call';
+
+export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED';
+
+export interface StatusObjectWithProgress extends StatusObject {
+ progress: RpcProgress;
+}
+
+export interface LoadBalancingCallInterceptingListener
+ extends InterceptingListener {
+ onReceiveStatus(status: StatusObjectWithProgress): void;
+}
+
+export class LoadBalancingCall implements Call {
+ private child: SubchannelCall | null = null;
+ private readPending = false;
+ private pendingMessage: { context: MessageContext; message: Buffer } | null =
+ null;
+ private pendingHalfClose = false;
+ private ended = false;
+ private serviceUrl: string;
+ private metadata: Metadata | null = null;
+ private listener: InterceptingListener | null = null;
+ private onCallEnded: ((statusCode: Status) => void) | null = null;
+ constructor(
+ private readonly channel: InternalChannel,
+ private readonly callConfig: CallConfig,
+ private readonly methodName: string,
+ private readonly host: string,
+ private readonly credentials: CallCredentials,
+ private readonly deadline: Deadline,
+ private readonly callNumber: number
+ ) {
+ const splitPath: string[] = this.methodName.split('/');
+ let serviceName = '';
+ /* The standard path format is "/{serviceName}/{methodName}", so if we split
+ * by '/', the first item should be empty and the second should be the
+ * service name */
+ if (splitPath.length >= 2) {
+ serviceName = splitPath[1];
+ }
+ const hostname = splitHostPort(this.host)?.host ?? 'localhost';
+ /* Currently, call credentials are only allowed on HTTPS connections, so we
+ * can assume that the scheme is "https" */
+ this.serviceUrl = `https://${hostname}/${serviceName}`;
+ }
+
+ private trace(text: string): void {
+ logging.trace(
+ LogVerbosity.DEBUG,
+ TRACER_NAME,
+ '[' + this.callNumber + '] ' + text
+ );
+ }
+
+ private outputStatus(status: StatusObject, progress: RpcProgress) {
+ if (!this.ended) {
+ this.ended = true;
+ this.trace(
+ 'ended with status: code=' +
+ status.code +
+ ' details="' +
+ status.details +
+ '"'
+ );
+ const finalStatus = { ...status, progress };
+ this.listener?.onReceiveStatus(finalStatus);
+ this.onCallEnded?.(finalStatus.code);
+ }
+ }
+
+ doPick() {
+ if (this.ended) {
+ return;
+ }
+ if (!this.metadata) {
+ throw new Error('doPick called before start');
+ }
+ this.trace('Pick called');
+ const pickResult = this.channel.doPick(
+ this.metadata,
+ this.callConfig.pickInformation
+ );
+ const subchannelString = pickResult.subchannel
+ ? '(' +
+ pickResult.subchannel.getChannelzRef().id +
+ ') ' +
+ pickResult.subchannel.getAddress()
+ : '' + pickResult.subchannel;
+ this.trace(
+ 'Pick result: ' +
+ PickResultType[pickResult.pickResultType] +
+ ' subchannel: ' +
+ subchannelString +
+ ' status: ' +
+ pickResult.status?.code +
+ ' ' +
+ pickResult.status?.details
+ );
+ switch (pickResult.pickResultType) {
+ case PickResultType.COMPLETE:
+ this.credentials
+ .generateMetadata({ service_url: this.serviceUrl })
+ .then(
+ credsMetadata => {
+ /* If this call was cancelled (e.g. by the deadline) before
+ * metadata generation finished, we shouldn't do anything with
+ * it. */
+ if (this.ended) {
+ this.trace('Credentials metadata generation finished after call ended');
+ return;
+ }
+ const finalMetadata = this.metadata!.clone();
+ finalMetadata.merge(credsMetadata);
+ if (finalMetadata.get('authorization').length > 1) {
+ this.outputStatus(
+ {
+ code: Status.INTERNAL,
+ details:
+ '"authorization" metadata cannot have multiple values',
+ metadata: new Metadata(),
+ },
+ 'PROCESSED'
+ );
+ }
+ if (
+ pickResult.subchannel!.getConnectivityState() !==
+ ConnectivityState.READY
+ ) {
+ this.trace(
+ 'Picked subchannel ' +
+ subchannelString +
+ ' has state ' +
+ ConnectivityState[
+ pickResult.subchannel!.getConnectivityState()
+ ] +
+ ' after getting credentials metadata. Retrying pick'
+ );
+ this.doPick();
+ return;
+ }
+
+ if (this.deadline !== Infinity) {
+ finalMetadata.set(
+ 'grpc-timeout',
+ getDeadlineTimeoutString(this.deadline)
+ );
+ }
+ try {
+ this.child = pickResult
+ .subchannel!.getRealSubchannel()
+ .createCall(finalMetadata, this.host, this.methodName, {
+ onReceiveMetadata: metadata => {
+ this.trace('Received metadata');
+ this.listener!.onReceiveMetadata(metadata);
+ },
+ onReceiveMessage: message => {
+ this.trace('Received message');
+ this.listener!.onReceiveMessage(message);
+ },
+ onReceiveStatus: status => {
+ this.trace('Received status');
+ if (
+ status.rstCode ===
+ http2.constants.NGHTTP2_REFUSED_STREAM
+ ) {
+ this.outputStatus(status, 'REFUSED');
+ } else {
+ this.outputStatus(status, 'PROCESSED');
+ }
+ },
+ });
+ } catch (error) {
+ this.trace(
+ 'Failed to start call on picked subchannel ' +
+ subchannelString +
+ ' with error ' +
+ (error as Error).message
+ );
+ this.outputStatus(
+ {
+ code: Status.INTERNAL,
+ details:
+ 'Failed to start HTTP/2 stream with error ' +
+ (error as Error).message,
+ metadata: new Metadata(),
+ },
+ 'NOT_STARTED'
+ );
+ return;
+ }
+ this.callConfig.onCommitted?.();
+ pickResult.onCallStarted?.();
+ this.onCallEnded = pickResult.onCallEnded;
+ this.trace(
+ 'Created child call [' + this.child.getCallNumber() + ']'
+ );
+ if (this.readPending) {
+ this.child.startRead();
+ }
+ if (this.pendingMessage) {
+ this.child.sendMessageWithContext(
+ this.pendingMessage.context,
+ this.pendingMessage.message
+ );
+ }
+ if (this.pendingHalfClose) {
+ this.child.halfClose();
+ }
+ },
+ (error: Error & { code: number }) => {
+ // We assume the error code isn't 0 (Status.OK)
+ const { code, details } = restrictControlPlaneStatusCode(
+ typeof error.code === 'number' ? error.code : Status.UNKNOWN,
+ `Getting metadata from plugin failed with error: ${error.message}`
+ );
+ this.outputStatus(
+ {
+ code: code,
+ details: details,
+ metadata: new Metadata(),
+ },
+ 'PROCESSED'
+ );
+ }
+ );
+ break;
+ case PickResultType.DROP:
+ const { code, details } = restrictControlPlaneStatusCode(
+ pickResult.status!.code,
+ pickResult.status!.details
+ );
+ setImmediate(() => {
+ this.outputStatus(
+ { code, details, metadata: pickResult.status!.metadata },
+ 'DROP'
+ );
+ });
+ break;
+ case PickResultType.TRANSIENT_FAILURE:
+ if (this.metadata.getOptions().waitForReady) {
+ this.channel.queueCallForPick(this);
+ } else {
+ const { code, details } = restrictControlPlaneStatusCode(
+ pickResult.status!.code,
+ pickResult.status!.details
+ );
+ setImmediate(() => {
+ this.outputStatus(
+ { code, details, metadata: pickResult.status!.metadata },
+ 'PROCESSED'
+ );
+ });
+ }
+ break;
+ case PickResultType.QUEUE:
+ this.channel.queueCallForPick(this);
+ }
+ }
+
+ cancelWithStatus(status: Status, details: string): void {
+ this.trace(
+ 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
+ );
+ this.child?.cancelWithStatus(status, details);
+ this.outputStatus(
+ { code: status, details: details, metadata: new Metadata() },
+ 'PROCESSED'
+ );
+ }
+ getPeer(): string {
+ return this.child?.getPeer() ?? this.channel.getTarget();
+ }
+ start(
+ metadata: Metadata,
+ listener: LoadBalancingCallInterceptingListener
+ ): void {
+ this.trace('start called');
+ this.listener = listener;
+ this.metadata = metadata;
+ this.doPick();
+ }
+ sendMessageWithContext(context: MessageContext, message: Buffer): void {
+ this.trace('write() called with message of length ' + message.length);
+ if (this.child) {
+ this.child.sendMessageWithContext(context, message);
+ } else {
+ this.pendingMessage = { context, message };
+ }
+ }
+ startRead(): void {
+ this.trace('startRead called');
+ if (this.child) {
+ this.child.startRead();
+ } else {
+ this.readPending = true;
+ }
+ }
+ halfClose(): void {
+ this.trace('halfClose called');
+ if (this.child) {
+ this.child.halfClose();
+ } else {
+ this.pendingHalfClose = true;
+ }
+ }
+ setCredentials(credentials: CallCredentials): void {
+ throw new Error('Method not implemented.');
+ }
+
+ getCallNumber(): number {
+ return this.callNumber;
+ }
+}