summaryrefslogtreecommitdiff
path: root/frontend-old/node_modules/@grpc/grpc-js/src/subchannel-call.ts
diff options
context:
space:
mode:
authoraltaf-creator <dev@altafcreator.com>2025-11-16 19:08:29 +0800
committeraltaf-creator <dev@altafcreator.com>2025-11-16 19:08:29 +0800
commit434aa8343fdcbb4d5002f934979913c099489bee (patch)
tree55bab4ec5a6151be57797d34f61faf5ea744471b /frontend-old/node_modules/@grpc/grpc-js/src/subchannel-call.ts
parent893c388d4e99442a36005e5971a87730623f946e (diff)
sdk, del
Diffstat (limited to 'frontend-old/node_modules/@grpc/grpc-js/src/subchannel-call.ts')
-rw-r--r--frontend-old/node_modules/@grpc/grpc-js/src/subchannel-call.ts547
1 files changed, 0 insertions, 547 deletions
diff --git a/frontend-old/node_modules/@grpc/grpc-js/src/subchannel-call.ts b/frontend-old/node_modules/@grpc/grpc-js/src/subchannel-call.ts
deleted file mode 100644
index b9f3191..0000000
--- a/frontend-old/node_modules/@grpc/grpc-js/src/subchannel-call.ts
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Copyright 2019 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import * as http2 from 'http2';
-import * as os from 'os';
-
-import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, Status } from './constants';
-import { Metadata } from './metadata';
-import { StreamDecoder } from './stream-decoder';
-import * as logging from './logging';
-import { LogVerbosity } from './constants';
-import {
- InterceptingListener,
- MessageContext,
- StatusObject,
- WriteCallback,
-} from './call-interface';
-import { CallEventTracker, Transport } from './transport';
-
-const TRACER_NAME = 'subchannel_call';
-
-/**
- * https://nodejs.org/api/errors.html#errors_class_systemerror
- */
-interface SystemError extends Error {
- address?: string;
- code: string;
- dest?: string;
- errno: number;
- info?: object;
- message: string;
- path?: string;
- port?: number;
- syscall: string;
-}
-
-/**
- * Should do approximately the same thing as util.getSystemErrorName but the
- * TypeScript types don't have that function for some reason so I just made my
- * own.
- * @param errno
- */
-function getSystemErrorName(errno: number): string {
- for (const [name, num] of Object.entries(os.constants.errno)) {
- if (num === errno) {
- return name;
- }
- }
- return 'Unknown system error ' + errno;
-}
-
-export interface SubchannelCall {
- cancelWithStatus(status: Status, details: string): void;
- getPeer(): string;
- sendMessageWithContext(context: MessageContext, message: Buffer): void;
- startRead(): void;
- halfClose(): void;
- getCallNumber(): number;
-}
-
-export interface StatusObjectWithRstCode extends StatusObject {
- rstCode?: number;
-}
-
-export interface SubchannelCallInterceptingListener
- extends InterceptingListener {
- onReceiveStatus(status: StatusObjectWithRstCode): void;
-}
-
-export class Http2SubchannelCall implements SubchannelCall {
- private decoder: StreamDecoder;
-
- private isReadFilterPending = false;
- private isPushPending = false;
- private canPush = false;
- /**
- * Indicates that an 'end' event has come from the http2 stream, so there
- * will be no more data events.
- */
- private readsClosed = false;
-
- private statusOutput = false;
-
- private unpushedReadMessages: Buffer[] = [];
-
- // Status code mapped from :status. To be used if grpc-status is not received
- private mappedStatusCode: Status = Status.UNKNOWN;
-
- // This is populated (non-null) if and only if the call has ended
- private finalStatus: StatusObject | null = null;
-
- private internalError: SystemError | null = null;
-
- constructor(
- private readonly http2Stream: http2.ClientHttp2Stream,
- private readonly callEventTracker: CallEventTracker,
- private readonly listener: SubchannelCallInterceptingListener,
- private readonly transport: Transport,
- private readonly callId: number
- ) {
- const maxReceiveMessageLength = transport.getOptions()['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
- this.decoder = new StreamDecoder(maxReceiveMessageLength);
- http2Stream.on('response', (headers, flags) => {
- let headersString = '';
- for (const header of Object.keys(headers)) {
- headersString += '\t\t' + header + ': ' + headers[header] + '\n';
- }
- this.trace('Received server headers:\n' + headersString);
- switch (headers[':status']) {
- // TODO(murgatroid99): handle 100 and 101
- case 400:
- this.mappedStatusCode = Status.INTERNAL;
- break;
- case 401:
- this.mappedStatusCode = Status.UNAUTHENTICATED;
- break;
- case 403:
- this.mappedStatusCode = Status.PERMISSION_DENIED;
- break;
- case 404:
- this.mappedStatusCode = Status.UNIMPLEMENTED;
- break;
- case 429:
- case 502:
- case 503:
- case 504:
- this.mappedStatusCode = Status.UNAVAILABLE;
- break;
- default:
- this.mappedStatusCode = Status.UNKNOWN;
- }
-
- if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
- this.handleTrailers(headers);
- } else {
- let metadata: Metadata;
- try {
- metadata = Metadata.fromHttp2Headers(headers);
- } catch (error) {
- this.endCall({
- code: Status.UNKNOWN,
- details: (error as Error).message,
- metadata: new Metadata(),
- });
- return;
- }
- this.listener.onReceiveMetadata(metadata);
- }
- });
- http2Stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
- this.handleTrailers(headers);
- });
- http2Stream.on('data', (data: Buffer) => {
- /* If the status has already been output, allow the http2 stream to
- * drain without processing the data. */
- if (this.statusOutput) {
- return;
- }
- this.trace('receive HTTP/2 data frame of length ' + data.length);
- let messages: Buffer[];
- try {
- messages = this.decoder.write(data);
- } catch (e) {
- this.cancelWithStatus(Status.RESOURCE_EXHAUSTED, (e as Error).message);
- return;
- }
-
- for (const message of messages) {
- this.trace('parsed message of length ' + message.length);
- this.callEventTracker!.addMessageReceived();
- this.tryPush(message);
- }
- });
- http2Stream.on('end', () => {
- this.readsClosed = true;
- this.maybeOutputStatus();
- });
- http2Stream.on('close', () => {
- /* Use process.next tick to ensure that this code happens after any
- * "error" event that may be emitted at about the same time, so that
- * we can bubble up the error message from that event. */
- process.nextTick(() => {
- this.trace('HTTP/2 stream closed with code ' + http2Stream.rstCode);
- /* If we have a final status with an OK status code, that means that
- * we have received all of the messages and we have processed the
- * trailers and the call completed successfully, so it doesn't matter
- * how the stream ends after that */
- if (this.finalStatus?.code === Status.OK) {
- return;
- }
- let code: Status;
- let details = '';
- switch (http2Stream.rstCode) {
- case http2.constants.NGHTTP2_NO_ERROR:
- /* If we get a NO_ERROR code and we already have a status, the
- * stream completed properly and we just haven't fully processed
- * it yet */
- if (this.finalStatus !== null) {
- return;
- }
- code = Status.INTERNAL;
- details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
- break;
- case http2.constants.NGHTTP2_REFUSED_STREAM:
- code = Status.UNAVAILABLE;
- details = 'Stream refused by server';
- break;
- case http2.constants.NGHTTP2_CANCEL:
- code = Status.CANCELLED;
- details = 'Call cancelled';
- break;
- case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
- code = Status.RESOURCE_EXHAUSTED;
- details = 'Bandwidth exhausted or memory limit exceeded';
- break;
- case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
- code = Status.PERMISSION_DENIED;
- details = 'Protocol not secure enough';
- break;
- case http2.constants.NGHTTP2_INTERNAL_ERROR:
- code = Status.INTERNAL;
- if (this.internalError === null) {
- /* This error code was previously handled in the default case, and
- * there are several instances of it online, so I wanted to
- * preserve the original error message so that people find existing
- * information in searches, but also include the more recognizable
- * "Internal server error" message. */
- details = `Received RST_STREAM with code ${http2Stream.rstCode} (Internal server error)`;
- } else {
- if (
- this.internalError.code === 'ECONNRESET' ||
- this.internalError.code === 'ETIMEDOUT'
- ) {
- code = Status.UNAVAILABLE;
- details = this.internalError.message;
- } else {
- /* The "Received RST_STREAM with code ..." error is preserved
- * here for continuity with errors reported online, but the
- * error message at the end will probably be more relevant in
- * most cases. */
- details = `Received RST_STREAM with code ${http2Stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
- }
- }
- break;
- default:
- code = Status.INTERNAL;
- details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
- }
- // This is a no-op if trailers were received at all.
- // This is OK, because status codes emitted here correspond to more
- // catastrophic issues that prevent us from receiving trailers in the
- // first place.
- this.endCall({
- code,
- details,
- metadata: new Metadata(),
- rstCode: http2Stream.rstCode,
- });
- });
- });
- http2Stream.on('error', (err: SystemError) => {
- /* We need an error handler here to stop "Uncaught Error" exceptions
- * from bubbling up. However, errors here should all correspond to
- * "close" events, where we will handle the error more granularly */
- /* Specifically looking for stream errors that were *not* constructed
- * from a RST_STREAM response here:
- * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
- */
- if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
- this.trace(
- 'Node error event: message=' +
- err.message +
- ' code=' +
- err.code +
- ' errno=' +
- getSystemErrorName(err.errno) +
- ' syscall=' +
- err.syscall
- );
- this.internalError = err;
- }
- this.callEventTracker.onStreamEnd(false);
- });
- }
-
- public onDisconnect() {
- this.endCall({
- code: Status.UNAVAILABLE,
- details: 'Connection dropped',
- metadata: new Metadata(),
- });
- }
-
- private outputStatus() {
- /* Precondition: this.finalStatus !== null */
- if (!this.statusOutput) {
- this.statusOutput = true;
- this.trace(
- 'ended with status: code=' +
- this.finalStatus!.code +
- ' details="' +
- this.finalStatus!.details +
- '"'
- );
- this.callEventTracker.onCallEnd(this.finalStatus!);
- /* We delay the actual action of bubbling up the status to insulate the
- * cleanup code in this class from any errors that may be thrown in the
- * upper layers as a result of bubbling up the status. In particular,
- * if the status is not OK, the "error" event may be emitted
- * synchronously at the top level, which will result in a thrown error if
- * the user does not handle that event. */
- process.nextTick(() => {
- this.listener.onReceiveStatus(this.finalStatus!);
- });
- /* Leave the http2 stream in flowing state to drain incoming messages, to
- * ensure that the stream closure completes. The call stream already does
- * not push more messages after the status is output, so the messages go
- * nowhere either way. */
- this.http2Stream.resume();
- }
- }
-
- private trace(text: string): void {
- logging.trace(
- LogVerbosity.DEBUG,
- TRACER_NAME,
- '[' + this.callId + '] ' + text
- );
- }
-
- /**
- * On first call, emits a 'status' event with the given StatusObject.
- * Subsequent calls are no-ops.
- * @param status The status of the call.
- */
- private endCall(status: StatusObjectWithRstCode): void {
- /* If the status is OK and a new status comes in (e.g. from a
- * deserialization failure), that new status takes priority */
- if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
- this.finalStatus = status;
- this.maybeOutputStatus();
- }
- this.destroyHttp2Stream();
- }
-
- private maybeOutputStatus() {
- if (this.finalStatus !== null) {
- /* The combination check of readsClosed and that the two message buffer
- * arrays are empty checks that there all incoming data has been fully
- * processed */
- if (
- this.finalStatus.code !== Status.OK ||
- (this.readsClosed &&
- this.unpushedReadMessages.length === 0 &&
- !this.isReadFilterPending &&
- !this.isPushPending)
- ) {
- this.outputStatus();
- }
- }
- }
-
- private push(message: Buffer): void {
- this.trace(
- 'pushing to reader message of length ' +
- (message instanceof Buffer ? message.length : null)
- );
- this.canPush = false;
- this.isPushPending = true;
- process.nextTick(() => {
- this.isPushPending = false;
- /* If we have already output the status any later messages should be
- * ignored, and can cause out-of-order operation errors higher up in the
- * stack. Checking as late as possible here to avoid any race conditions.
- */
- if (this.statusOutput) {
- return;
- }
- this.listener.onReceiveMessage(message);
- this.maybeOutputStatus();
- });
- }
-
- private tryPush(messageBytes: Buffer): void {
- if (this.canPush) {
- this.http2Stream!.pause();
- this.push(messageBytes);
- } else {
- this.trace(
- 'unpushedReadMessages.push message of length ' + messageBytes.length
- );
- this.unpushedReadMessages.push(messageBytes);
- }
- }
-
- private handleTrailers(headers: http2.IncomingHttpHeaders) {
- this.callEventTracker.onStreamEnd(true);
- let headersString = '';
- for (const header of Object.keys(headers)) {
- headersString += '\t\t' + header + ': ' + headers[header] + '\n';
- }
- this.trace('Received server trailers:\n' + headersString);
- let metadata: Metadata;
- try {
- metadata = Metadata.fromHttp2Headers(headers);
- } catch (e) {
- metadata = new Metadata();
- }
- const metadataMap = metadata.getMap();
- let code: Status = this.mappedStatusCode;
- if (
- code === Status.UNKNOWN &&
- typeof metadataMap['grpc-status'] === 'string'
- ) {
- const receivedStatus = Number(metadataMap['grpc-status']);
- if (receivedStatus in Status) {
- code = receivedStatus;
- this.trace('received status code ' + receivedStatus + ' from server');
- }
- metadata.remove('grpc-status');
- }
- let details = '';
- if (typeof metadataMap['grpc-message'] === 'string') {
- try {
- details = decodeURI(metadataMap['grpc-message']);
- } catch (e) {
- details = metadataMap['grpc-message'];
- }
- metadata.remove('grpc-message');
- this.trace(
- 'received status details string "' + details + '" from server'
- );
- }
- const status: StatusObject = { code, details, metadata };
- // This is a no-op if the call was already ended when handling headers.
- this.endCall(status);
- }
-
- private destroyHttp2Stream() {
- // The http2 stream could already have been destroyed if cancelWithStatus
- // is called in response to an internal http2 error.
- if (!this.http2Stream.destroyed) {
- /* If the call has ended with an OK status, communicate that when closing
- * the stream, partly to avoid a situation in which we detect an error
- * RST_STREAM as a result after we have the status */
- let code: number;
- if (this.finalStatus?.code === Status.OK) {
- code = http2.constants.NGHTTP2_NO_ERROR;
- } else {
- code = http2.constants.NGHTTP2_CANCEL;
- }
- this.trace('close http2 stream with code ' + code);
- this.http2Stream.close(code);
- }
- }
-
- cancelWithStatus(status: Status, details: string): void {
- this.trace(
- 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
- );
- this.endCall({ code: status, details, metadata: new Metadata() });
- }
-
- getStatus(): StatusObject | null {
- return this.finalStatus;
- }
-
- getPeer(): string {
- return this.transport.getPeerName();
- }
-
- getCallNumber(): number {
- return this.callId;
- }
-
- startRead() {
- /* If the stream has ended with an error, we should not emit any more
- * messages and we should communicate that the stream has ended */
- if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
- this.readsClosed = true;
- this.maybeOutputStatus();
- return;
- }
- this.canPush = true;
- if (this.unpushedReadMessages.length > 0) {
- const nextMessage: Buffer = this.unpushedReadMessages.shift()!;
- this.push(nextMessage);
- return;
- }
- /* Only resume reading from the http2Stream if we don't have any pending
- * messages to emit */
- this.http2Stream.resume();
- }
-
- sendMessageWithContext(context: MessageContext, message: Buffer) {
- this.trace('write() called with message of length ' + message.length);
- const cb: WriteCallback = (error?: Error | null) => {
- /* nextTick here ensures that no stream action can be taken in the call
- * stack of the write callback, in order to hopefully work around
- * https://github.com/nodejs/node/issues/49147 */
- process.nextTick(() => {
- let code: Status = Status.UNAVAILABLE;
- if (
- (error as NodeJS.ErrnoException)?.code ===
- 'ERR_STREAM_WRITE_AFTER_END'
- ) {
- code = Status.INTERNAL;
- }
- if (error) {
- this.cancelWithStatus(code, `Write error: ${error.message}`);
- }
- context.callback?.();
- });
- };
- this.trace('sending data chunk of length ' + message.length);
- this.callEventTracker.addMessageSent();
- try {
- this.http2Stream!.write(message, cb);
- } catch (error) {
- this.endCall({
- code: Status.UNAVAILABLE,
- details: `Write failed with error ${(error as Error).message}`,
- metadata: new Metadata(),
- });
- }
- }
-
- halfClose() {
- this.trace('end() called');
- this.trace('calling end() on HTTP/2 stream');
- this.http2Stream.end();
- }
-}